diff --git a/docs/source/examples/README.rst b/docs/source/examples/README.rst index 3d5089de2..37b9cfe3c 100644 --- a/docs/source/examples/README.rst +++ b/docs/source/examples/README.rst @@ -8,6 +8,7 @@ Examples - :doc:`distributed_tensors.py `: Shows how to dispatch tensors and tensor level operations to a distributed mesh of workers and GPUs - :doc:`debugging.py `: Shows how to use the Monarch debugger to debug a distributed program - `Multinode Slurm Tutorial `_: Multinode distributed training tutorial using Monarch and Slurm to run an SPMD training job. +- `Running on Kubernetes using Skypilot `_: Run Monarch on Kubernetes and cloud VMs via SkyPilot. .. toctree:: :hidden: diff --git a/docs/source/examples/getting_started.py b/docs/source/examples/getting_started.py index 6c7359f95..476a550bb 100644 --- a/docs/source/examples/getting_started.py +++ b/docs/source/examples/getting_started.py @@ -145,8 +145,8 @@ def get_value(self) -> int: # ============== # When we created our processes before, we spawned them on `this_host()` -- the machine # running the top-level script. For larger jobs, monarch controls many machines. How these -# machines are obtained depends on the scheduling system (slurm, kubernetes, etc), but these -# schedulers are typically encapsulated in a config file. +# machines are obtained depends on the scheduling system (Slurm, Kubernetes, SkyPilot, etc.), +# but these schedulers are typically encapsulated in a config file. from monarch.actor import context, HostMesh, hosts_from_config diff --git a/examples/skypilot/README.md b/examples/skypilot/README.md new file mode 100644 index 000000000..1c267275f --- /dev/null +++ b/examples/skypilot/README.md @@ -0,0 +1,303 @@ +# Running Monarch on Kubernetes and cloud VMs via SkyPilot + +This directory contains examples for running Monarch workloads on **Kubernetes and cloud VMs** via [SkyPilot](https://github.com/skypilot-org/skypilot). + +## Overview + +`SkyPilotJob` provisions cloud instances (or K8s pods) and starts Monarch workers on them, allowing you to run distributed Monarch actors across multiple Kubernetes pods. + +### Architecture + +```mermaid +flowchart TB + subgraph laptop["💻 Your Laptop"] + user["$ sky launch monarch_quickstart.sky.yaml"] + end + + subgraph k8s["☸️ Kubernetes Cluster"] + subgraph driver["Driver Pod"] + script["skypilot_quickstart_driver.py"] + skyjob["SkyPilotJob"] + end + + subgraph workers["Worker Pods (SkyPilot clusters)"] + subgraph w1["Worker Pod 0"] + mw1["Monarch Worker"] + end + subgraph w2["Worker Pod 1"] + mw2["Monarch Worker"] + end + end + end + + user -->|"SkyPilot launches"| driver + script --> skyjob + skyjob -->|"provisioned via SkyPilot"| workers + skyjob <-->|"TCP :22222"| mw1 + skyjob <-->|"TCP :22222"| mw2 + mw1 + mw2 +``` + +**How it works:** +1. You run `sky launch` from your laptop to start the driver pod +2. The driver runs `skypilot_quickstart_driver.py` which creates a `SkyPilotJob` +3. `SkyPilotJob` provisions GPU worker pods via SkyPilot +4. The driver connects to Monarch workers over TCP (port 22222) +5. Actors are spawned on each GPU and execute your distributed code + +**Supported infra:** +- Kubernetes (any cluster) +- Hyperscalers: AWS, GCP, Azure +- Neoclouds: CoreWeave, Nebius, and [20+ other clouds](https://docs.skypilot.co/en/latest/getting-started/installation.html) + +## Quickstart + +Prerequisites: Install SkyPilot and verify GPUs are available. +
+SkyPilot Installation + +```bash +# Install SkyPilot with your preferred backend +pip install skypilot[kubernetes] # For Kubernetes +pip install skypilot[aws] # For AWS +pip install skypilot[gcp] # For GCP +pip install skypilot[all] # For all clouds + +# Verify SkyPilot setup +sky check + +# Verify GPUs available +sky show-gpus --infra kubernetes +``` + +For more details, see the [SkyPilot documentation](https://docs.skypilot.co/en/latest/getting-started/installation.html). + +
+ + +Run this command from your local machine to run the getting started example: + +```bash +sky launch monarch_quickstart.sky.yaml -c monarch-demo +``` + +
+💡 Customizing the run (GPU count, CPU-only mode, etc.) + +Run `sky show-gpus --infra kubernetes` to see available GPUs in your cluster, then customize with environment variables: + +```bash +# Custom GPU configuration +sky launch monarch_quickstart.sky.yaml -c monarch-demo \ + --env NUM_HOSTS=4 \ + --env GPUS_PER_HOST=8 \ + --env ACCELERATOR="H100:8" + +# CPU-only mode (no GPUs required) +sky launch monarch_quickstart.sky.yaml -c monarch-demo \ + --env GPUS_PER_HOST=0 \ + --env ACCELERATOR=none +``` + +
+ + +On running `sky launch`, SkyPilot will: +1. Launch a Kubernetes pod +2. Install dependencies +3. Sync the example directory with the pod +4. Run `skypilot_quickstart_driver.py` in the pod and stream the logs + +
+Example Output + +``` +============================================================ +Monarch Getting Started with SkyPilot +============================================================ + +Configuration: + Cloud: kubernetes + Hosts: 2 + GPUs per host: 1 + Accelerator: H200:1 + Cluster name: monarch-skypilot-test + +[1] Creating SkyPilot job... + +[2] Launching cluster and starting Monarch workers... +No cached job found at path: .monarch/job_state.pkl +Applying current job +Launching SkyPilot cluster 'monarch-skypilot-test' with 2 nodes +Running on cluster: monarch-skypilot-test +SkyPilot cluster 'monarch-skypilot-test' launched successfully +Waiting for job 1 setup to complete (timeout=300s)... +Job 1 status: JobStatus.SETTING_UP (waited 5s) +Job 1 is now RUNNING (setup complete) +Saving job to cache at .monarch/job_state.pkl +Job has started, connecting to current state +Found 2 nodes ready +Connecting to workers for mesh 'trainers': ['tcp://10.0.4.22:22222', 'tcp://10.0.4.112:22222'] +Monarch internal logs are being written to /tmp/sky/monarch_log.log; execution id sky_Dec-11_01:31_653 +Waiting for host mesh 'trainers' to initialize... +Host mesh 'trainers' initialized successfully +Host mesh 'trainers' ready + Got host mesh with extent: {hosts: 2} + +[3] Spawning processes on cloud hosts... + Process mesh extent: {hosts: 2, gpus: 1} + +[4] Spawning Counter actors... + +[5] Broadcasting increment to all counters... + +[6] Getting counter values... + Counter values: ValueMesh({hosts: 2, gpus: 1}): + (({'hosts': 0/2, 'gpus': 0/1}, 3), ({'hosts': 1/2, 'gpus': 0/1}, 3)) + +[7] Spawning Trainer actors... + +[8] Performing distributed training step... + ({'hosts': 0/2, 'gpus': 0/1}, "Trainer {'hosts': 0/2, 'gpus': 0/1} taking a step.") + ({'hosts': 1/2, 'gpus': 0/1}, "Trainer {'hosts': 1/2, 'gpus': 0/1} taking a step.") + +[9] Getting trainer info... + ({'hosts': 0/2, 'gpus': 0/1}, "Trainer at rank {'hosts': 0/2, 'gpus': 0/1}") + ({'hosts': 1/2, 'gpus': 0/1}, "Trainer at rank {'hosts': 1/2, 'gpus': 0/1}") + +============================================================ +Success! Monarch actors ran on SkyPilot cluster! +============================================================ + +[10] Cleaning up SkyPilot cluster... +Tearing down SkyPilot cluster 'monarch-skypilot-test' +Cluster 'monarch-skypilot-test' terminated + Cluster terminated. +``` + +
+ +When done, clean up with: +```bash +sky down monarch-demo +``` + + +
+Running from within the Kubernetes cluster + +If you are already in the Kubernetes cluster you'd like to run workers on, you can directly run `skypilot_quickstart_driver.py`. + +```bash +# With GPUs +python skypilot_quickstart_driver.py --cloud kubernetes --num-hosts 2 --gpus-per-host 8 --accelerator "H200:8" + +# CPU-only (no GPUs) +python skypilot_quickstart_driver.py --cloud kubernetes --num-hosts 2 --gpus-per-host 0 --accelerator none +``` + +
+ +### Running the DDP Jupyter Notebook + +To run the `skypilot_ddp.ipynb` notebook interactively, first launch a driver pod and then connect via SSH port forwarding: + +```bash +# 1. Launch a driver pod (without running a script) +sky launch monarch_quickstart.sky.yaml -c monarch-demo + +# 2. SSH into the pod with port forwarding for Jupyter +ssh monarch-demo -L 8888:localhost:8888 + +# 3. Inside the pod, start Jupyter Notebook +cd ~/sky_workdir +uv pip install --system jupyter +jupyter notebook --no-browser --port=8888 --ip=0.0.0.0 --NotebookApp.token='' --NotebookApp.password='' --allow-root +``` + +Then open http://localhost:8888 in your browser and open `skypilot_ddp.ipynb`. + +When done, clean up with: +```bash +sky down monarch-demo +``` + +## SkyPilotJob Class + +SkyPilotJob allows you to run Monarch on Kubernetes and cloud VMs via SkyPilot. + +Example usage: + +```python +import sky +from monarch_skypilot import SkyPilotJob +from monarch.actor import Actor, endpoint + +class MyActor(Actor): + @endpoint + def hello(self) -> str: + return "Hello from the cloud!" + +# Create a SkyPilot job with 2 nodes +job = SkyPilotJob( + meshes={"workers": 2}, + resources=sky.Resources( + cloud=sky.Kubernetes(), # or sky.AWS(), sky.GCP(), etc. + accelerators="H100:1", + ), + cluster_name="my-monarch-cluster", + idle_minutes_to_autostop=10, + down_on_autostop=True, +) + +# Launch and connect +state = job.state() +hosts = state.workers + +# Spawn processes and actors +procs = hosts.spawn_procs(per_host={"gpus": 1}) +actors = procs.spawn("my_actors", MyActor) + +# Use your actors +results = actors.hello.call().get() +print(results) # ["Hello from the cloud!", "Hello from the cloud!"] + +# Clean up +job.kill() +``` + +### Network Requirements + +The client must have direct network connectivity to the worker nodes: +- **Kubernetes**: Run the client inside the same cluster (e.g., in a pod) +- **Cloud VMs**: Ensure security groups allow inbound traffic on port 22222 + + +### Default Image + +By default, `SkyPilotJob` uses the `pytorch/pytorch:2.9.1-cuda12.8-cudnn9-runtime` Docker image which has compatible system libraries for `torchmonarch-nightly`. + +## Troubleshooting tips + +**Check SkyPilot setup:** +```bash +sky check +sky show-gpus +``` + +**View cluster logs:** +```bash +sky logs +``` + +**SSH into a worker:** +```bash +sky ssh +``` + +**Clean up clusters:** +```bash +sky down +sky down --all # Remove all clusters +``` diff --git a/examples/skypilot/monarch_quickstart.sky.yaml b/examples/skypilot/monarch_quickstart.sky.yaml new file mode 100644 index 000000000..393f95176 --- /dev/null +++ b/examples/skypilot/monarch_quickstart.sky.yaml @@ -0,0 +1,89 @@ +# SkyPilot YAML for running the Monarch Quickstart example. +# +# This YAML file syncs the example directory, installs dependencies, +# and runs the quickstart example. +# +# Usage: +# cd monarch/examples/skypilot +# sky launch monarch_quickstart.sky.yaml -c monarch-demo +# +# # For CPU-only clusters (no GPUs): +# sky launch monarch_quickstart.sky.yaml -c monarch-demo --env GPUS_PER_HOST=0 --env ACCELERATOR=none +# +# To view logs: +# sky logs monarch-demo +# +# To SSH into the cluster: +# sky ssh monarch-demo +# +# To tear down: +# sky down monarch-demo + +name: monarch-quickstart + +resources: + cloud: kubernetes # Optional, remove or change to your preferred cloud provider + cpus: 2+ # No GPUs needed for the driver script + image_id: docker:pytorch/pytorch:2.9.1-cuda12.8-cudnn9-runtime + +# Environment variables for configuring the example +# Override with: sky launch ... --env NUM_HOSTS=4 --env GPUS_PER_HOST=8 +envs: + NUM_HOSTS: 2 # Number of worker nodes to provision + GPUS_PER_HOST: 1 # GPUs per worker (set to 0 for CPU-only) + ACCELERATOR: "H200:1" # SkyPilot GPU spec (set to "none" for CPU-only). Keep quantity aligned with GPUS_PER_HOST. + +# Sync the current directory (examples/skypilot) to the cluster +workdir: . + +setup: | + set -ex + + echo "=== Installing system dependencies ===" + # Install socat (required for SkyPilot Kubernetes portforward networking) and curl + apt-get update && apt-get install -y socat curl + + # Install kubectl for Kubernetes cluster management + echo "=== Installing kubectl ===" + curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" + chmod +x kubectl + mv kubectl /usr/local/bin/ + kubectl version --client || echo "kubectl installed" + + echo "=== Installing Python dependencies ===" + uv pip install --system torchmonarch-nightly + # Install SkyPilot with Kubernetes support for launching nested clusters + uv pip install --system "skypilot[kubernetes]" + + # Verify installations + python -c "import monarch; print(f'Monarch installed: {monarch}')" + python -c "import sky; print(f'SkyPilot installed: {sky}')" + + # Configure SkyPilot to use in-cluster Kubernetes context + # This allows the driver pod to launch nested SkyPilot clusters + unset SKYPILOT_IN_CLUSTER_CONTEXT_NAME + sky api start + + # Verify Kubernetes access + echo "=== Verifying Kubernetes access ===" + sky check kubernetes + + echo "=== GPUs available ===" + sky show-gpus --infra kubernetes + + echo "=== Setup complete ===" + +run: | + echo "=== Running Monarch Quickstart with SkyPilot ===" + echo "Configuration: NUM_HOSTS=$NUM_HOSTS, GPUS_PER_HOST=$GPUS_PER_HOST, ACCELERATOR=$ACCELERATOR" + + # Run the getting started example + # Uses environment variables set above (can be overridden with --env) + python skypilot_quickstart_driver.py \ + --cloud kubernetes \ + --num-hosts $NUM_HOSTS \ + --gpus-per-host $GPUS_PER_HOST \ + --cluster-name monarch-workers \ + --accelerator "$ACCELERATOR" + + echo "=== Example ran successfully ===" diff --git a/examples/skypilot/monarch_skypilot/__init__.py b/examples/skypilot/monarch_skypilot/__init__.py new file mode 100644 index 000000000..d590260cd --- /dev/null +++ b/examples/skypilot/monarch_skypilot/__init__.py @@ -0,0 +1,20 @@ +""" +Monarch SkyPilot Integration Package. + +This package provides SkyPilotJob - a way to run Monarch workloads on +Kubernetes and cloud VMs via SkyPilot. + +Usage: + from monarch_skypilot import SkyPilotJob + + job = SkyPilotJob( + meshes={"workers": 2}, + resources=sky.Resources(cloud=sky.Kubernetes(), accelerators="H100:1"), + ) + state = job.state() +""" + +from .skypilot_job import SkyPilotJob + +__all__ = ["SkyPilotJob"] + diff --git a/examples/skypilot/monarch_skypilot/skypilot_job.py b/examples/skypilot/monarch_skypilot/skypilot_job.py new file mode 100644 index 000000000..54d5a6268 --- /dev/null +++ b/examples/skypilot/monarch_skypilot/skypilot_job.py @@ -0,0 +1,474 @@ +""" +Monarch JobTrait implementation for SkyPilot. + +SkyPilotJob allows running Monarch on Kubernetes and cloud VMs via SkyPilot. + +Requirements: + - pip install torchmonarch-nightly (or torchmonarch) + - pip install skypilot[kubernetes] (or other cloud backends) +""" + +import logging +import os +import time +from typing import Dict, List, Optional, TYPE_CHECKING + +from monarch._src.job.job import JobState, JobTrait + +# If running inside a SkyPilot cluster, unset the in-cluster context variable +# to allow launching new clusters on the same Kubernetes cluster. +# This must be done before importing sky to affect the API server. +if "SKYPILOT_IN_CLUSTER_CONTEXT_NAME" in os.environ: + del os.environ["SKYPILOT_IN_CLUSTER_CONTEXT_NAME"] + +if TYPE_CHECKING: + import sky + +try: + import sky + HAS_SKYPILOT = True +except ImportError: + HAS_SKYPILOT = False + sky = None # type: ignore[assignment] + + +logger: logging.Logger = logging.getLogger(__name__) + +# Default port for Monarch TCP communication +MONARCH_WORKER_PORT = 22222 + +# Timeout for waiting for the job to reach RUNNING status. +JOB_TIMEOUT = 300 # seconds + +# Default setup commands to install Monarch from PyPI on remote workers. +# Requires a Docker image with Ubuntu 22.04+ with RDMA dependencies. +# In this implementation, we default to pytorch/pytorch:2.9.1-cuda12.8-cudnn9-runtime image. +# +# For faster cold starts (<30s), use a custom Docker image with Monarch pre-installed. +DEFAULT_SETUP_COMMANDS = """ +set -ex + +# Install torchmonarch from PyPI +uv pip install --system torchmonarch-nightly + +echo "Done installing Monarch" +""" +DEFAULT_IMAGE_ID = "docker:pytorch/pytorch:2.9.1-cuda12.8-cudnn9-runtime" + + +def _configure_transport() -> None: + """Configure the Monarch transport using the public API.""" + from monarch.actor import enable_transport + enable_transport("tcp") + + +def _attach_to_workers_wrapper(name: str, ca: str, workers: List[str]): + """Wrapper around attach_to_workers with deferred import.""" + from monarch._src.actor.bootstrap import attach_to_workers + + return attach_to_workers(name=name, ca=ca, workers=workers) + + +class SkyPilotJob(JobTrait): + """ + SkyPilotJob to provision and manage Monarch workers K8s and cloud VMs. + + SkyPilot supports multiple backends - Kubernetes and VMs on AWS, GCP, Azure, + CoreWeave, Nebius, and 20+ other clouds. + + This implementation: + 1. Uses sky.launch() to provision cloud instances with specified resources + 2. Runs Monarch workers on each node via a startup script + 3. Connects to workers using their IP addresses from the cluster handle + + Caveats: + * For Kubernetes, the driver/client must be run inside the same cluster. + TOOD(romilb): Explore if loadbalancer can be used to connect to workers. + + Example: + >>> import sky + >>> from monarch_skypilot import SkyPilotJob + >>> + >>> job = SkyPilotJob( + ... meshes={"trainers": 2}, + ... resources=sky.Resources(accelerators="A100:1"), + ... cluster_name="my-monarch-cluster", + ... ) + >>> state = job.state() + >>> trainers = state.trainers # HostMesh with 2 nodes + """ + + def __init__( + self, + meshes: Dict[str, int], + resources: Optional["sky.Resources"] = None, + cluster_name: Optional[str] = None, + monarch_port: int = MONARCH_WORKER_PORT, + idle_minutes_to_autostop: Optional[int] = None, + down_on_autostop: bool = True, + python_exe: str = "python", + setup_commands: Optional[str] = None, + workdir: Optional[str] = None, + file_mounts: Optional[Dict[str, str]] = None, + ) -> None: + """ + Args: + meshes: Dictionary mapping mesh names to number of nodes. + e.g., {"trainers": 4, "dataloaders": 2} + resources: SkyPilot Resources specification for the instances. + If None, uses SkyPilot defaults. + cluster_name: Name for the SkyPilot cluster. If None, auto-generated. + monarch_port: Port bootstrapping communication between Monarch workers. + idle_minutes_to_autostop: If set, cluster will autostop after this + many minutes of idleness. + down_on_autostop: If True, tear down cluster on autostop instead of + just stopping it. On Kubernetes, autostop is not + supported and this must be set to True. Pods will + be deleted when the SkyPilot cluster is downed. + python_exe: Python executable to use for worker processes. + setup_commands: Optional setup commands to run before starting workers. + If None, uses DEFAULT_SETUP_COMMANDS which installs + torchmonarch-nightly from PyPI. + workdir: Local directory to sync to the cluster. If provided, this + directory will be uploaded to ~/sky_workdir on each node. + file_mounts: Dictionary mapping remote paths to local paths for + additional file mounts. + """ + if not HAS_SKYPILOT: + raise ImportError( + "SkyPilot is not installed. Install it with: pip install skypilot[kubernetes]" + ) + + # Configure transport at runtime when Monarch is available + try: + _configure_transport() + except ImportError: + # Monarch bindings not available, will fail later when needed + pass + + super().__init__() + + self._meshes = meshes + self._resources = resources + self._cluster_name = cluster_name + self._port = monarch_port + self._idle_minutes_to_autostop = idle_minutes_to_autostop + self._down_on_autostop = down_on_autostop + self._python_exe = python_exe + self._setup_commands = setup_commands + self._workdir = workdir + self._file_mounts = file_mounts + + # Runtime state + self._launched_cluster_name: Optional[str] = None + self._node_ips: List[str] = [] + + def _cleanup_on_failure(self) -> None: + """Clean up cluster resources on failure.""" + if self._launched_cluster_name: + try: + logger.warning(f"Cleaning up cluster '{self._launched_cluster_name}' after failure") + request_id = sky.down(self._launched_cluster_name) + sky.get(request_id) + logger.info(f"Cluster '{self._launched_cluster_name}' cleaned up") + except Exception as cleanup_error: + logger.warning(f"Failed to cleanup cluster: {cleanup_error}") + finally: + self._launched_cluster_name = None + self._node_ips.clear() + + def _create(self, client_script: Optional[str]) -> None: + """Launch a SkyPilot cluster and start Monarch workers.""" + if client_script is not None: + raise RuntimeError("SkyPilotJob cannot run batch-mode scripts yet") + + total_nodes = sum(self._meshes.values()) + + # Build the worker startup command + worker_command = self._build_worker_command() + + # Use provided setup commands or default to PyPI install + setup = self._setup_commands if self._setup_commands is not None else DEFAULT_SETUP_COMMANDS + if setup and not setup.endswith("\n"): + setup += "\n" + + # Create the SkyPilot task + task = sky.Task( + name="monarch-workers", + setup=setup if setup else None, + run=worker_command, + num_nodes=total_nodes, + workdir=self._workdir, + ) + + # Add file mounts if provided + if self._file_mounts: + task.set_file_mounts(self._file_mounts) + + # Set resources, using default image_id if not specified + resources = self._resources + if resources is not None: + if resources.image_id is None: + resources = resources.copy(image_id=DEFAULT_IMAGE_ID) + task.set_resources(resources) + else: + task.set_resources(sky.Resources(image_id=DEFAULT_IMAGE_ID)) + + # Generate cluster name if not provided + cluster_name = self._cluster_name or f"monarch-{os.getpid()}" + + # Set early so cleanup can work if later steps fail + self._launched_cluster_name = cluster_name + + logger.info(f"Launching SkyPilot cluster '{cluster_name}' with {total_nodes} nodes") + + # Launch the cluster + try: + request_id = sky.launch( + task, + cluster_name=cluster_name, + idle_minutes_to_autostop=self._idle_minutes_to_autostop, + down=self._down_on_autostop, + ) + # Get the result from the request + job_id, handle = sky.get(request_id) + except Exception as e: + logger.error(f"Failed to launch SkyPilot cluster: {e}") + self._cleanup_on_failure() + raise RuntimeError(f"Failed to launch SkyPilot cluster: {e}") from e + + logger.info(f"SkyPilot cluster '{cluster_name}' launched successfully") + + # Wait for the job to be RUNNING (setup complete, run started) + try: + self._wait_for_job_running(cluster_name, job_id, timeout=JOB_TIMEOUT) + except Exception as e: + logger.error(f"Job failed to reach RUNNING status: {e}") + self._cleanup_on_failure() + raise + + def _wait_for_job_running(self, cluster_name: str, job_id: int, timeout: int = JOB_TIMEOUT) -> None: + """Wait for the SkyPilot job to reach RUNNING status (setup complete).""" + start_time = time.time() + poll_interval = 10 # seconds + + logger.info(f"Waiting for job {job_id} setup to complete (timeout={timeout}s)...") + + while time.time() - start_time < timeout: + try: + # Get job queue for the cluster + request_id = sky.queue(cluster_name) + jobs = sky.get(request_id) + + # Find our job + for job in jobs: + if job.get('id') == job_id or job.get('job_id') == job_id: + status = job.get('status', '') + status_str = str(status) + if 'RUNNING' in status_str: + logger.info(f"Job {job_id} is now RUNNING (setup complete)") + return + elif 'FAILED' in status_str or 'CANCELLED' in status_str: + raise RuntimeError(f"Job {job_id} failed with status: {status}. Check logs with: sky logs {cluster_name}") + else: + elapsed = int(time.time() - start_time) + logger.info(f"Job {job_id} status: {status} (waited {elapsed}s)") + break + + except Exception as e: + logger.warning(f"Error checking job status: {e}") + + time.sleep(poll_interval) + + raise RuntimeError(f"Timeout waiting for job {job_id} to reach RUNNING status") + + def _build_worker_command(self) -> str: + """Build the bash command to start Monarch workers on each node.""" + # This command will be run on each node via SkyPilot + # SkyPilot expects a bash script, so we wrap Python code in python -c + # Note: Use IP address (not hostname) for the worker address since + # Kubernetes hostnames may not resolve across pods + python_code = f''' +import socket +import logging +import sys + +# Enable verbose logging +logging.basicConfig(level=logging.DEBUG, stream=sys.stdout, format="%(asctime)s %(levelname)s %(name)s: %(message)s") + +hostname = socket.gethostname() +ip_addr = socket.gethostbyname(hostname) +address = f"tcp://{{ip_addr}}:{self._port}" +print(f"Starting Monarch worker at {{address}} (hostname={{hostname}})", flush=True) +sys.stdout.flush() + +try: + from monarch.actor import run_worker_loop_forever + print(f"Imported run_worker_loop_forever successfully", flush=True) + print(f"Worker ready and listening...", flush=True) + run_worker_loop_forever(address=address, ca="trust_all_connections") +except Exception as e: + print(f"ERROR in worker: {{e}}", flush=True) + import traceback + traceback.print_exc() + raise +''' + # Escape single quotes in the Python code for bash + escaped_code = python_code.replace("'", "'\"'\"'") + # Set timeout env vars + env_vars = " ".join([ + f"export HYPERACTOR_HOST_SPAWN_READY_TIMEOUT={JOB_TIMEOUT}s", + f"export HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT={JOB_TIMEOUT}s", + f"export HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE={JOB_TIMEOUT}s", + ]) + return f"{env_vars} && {self._python_exe} -c '{escaped_code}'" + + def _get_node_ips(self) -> List[str]: + """Get the IP addresses of all nodes in the cluster.""" + if not self._launched_cluster_name: + raise RuntimeError("Cluster has not been launched yet") + + # Query cluster status to get handle with node IPs + try: + request_id = sky.status(cluster_names=[self._launched_cluster_name]) + statuses = sky.get(request_id) + except Exception as e: + raise RuntimeError(f"Failed to get cluster status: {e}") from e + + if not statuses: + raise RuntimeError( + f"Cluster '{self._launched_cluster_name}' not found" + ) + + status = statuses[0] + handle = status.handle + + if handle is None: + raise RuntimeError( + f"Cluster '{self._launched_cluster_name}' has no handle" + ) + + # Get the external IPs from the handle + if handle.stable_internal_external_ips is None: + raise RuntimeError("Cluster has no IP information") + + # stable_internal_external_ips is List[Tuple[internal_ip, external_ip]] + # We use external IPs to connect + ips = [] + for internal_ip, external_ip in handle.stable_internal_external_ips: + # Prefer external IP, fall back to internal + ip = external_ip if external_ip else internal_ip + if ip: + ips.append(ip) + + if not ips: + raise RuntimeError("No IP addresses found for cluster nodes") + + return ips + + def _wait_for_workers_ready( + self, expected_nodes: int, timeout: int = 300, poll_interval: int = 5 + ) -> List[str]: + """Wait for workers to be ready and return their addresses.""" + start_time = time.time() + + while time.time() - start_time < timeout: + try: + ips = self._get_node_ips() + if len(ips) >= expected_nodes: + logger.info(f"Found {len(ips)} nodes ready") + return ips + except Exception as e: + logger.debug(f"Waiting for workers: {e}") + + time.sleep(poll_interval) + + raise RuntimeError( + f"Timeout waiting for {expected_nodes} workers after {timeout}s" + ) + + def _state(self) -> JobState: + """Get the current state with HostMesh objects for each mesh.""" + if not self._jobs_active(): + raise RuntimeError("SkyPilot cluster is not active") + + # Get node IPs if not cached + if not self._node_ips: + total_nodes = sum(self._meshes.values()) + self._node_ips = self._wait_for_workers_ready(total_nodes) + + # Distribute IPs among meshes + host_meshes = {} + ip_idx = 0 + + for mesh_name, num_nodes in self._meshes.items(): + mesh_ips = self._node_ips[ip_idx : ip_idx + num_nodes] + ip_idx += num_nodes + + workers = [f"tcp://{ip}:{self._port}" for ip in mesh_ips] + logger.info(f"Connecting to workers for mesh '{mesh_name}': {workers}") + + host_mesh = _attach_to_workers_wrapper( + name=mesh_name, + ca="trust_all_connections", + workers=workers, + ) + + # Wait for the host mesh to be initialized (connections established) + logger.info(f"Waiting for host mesh '{mesh_name}' to initialize...") + host_mesh.initialized.get() + logger.info(f"Host mesh '{mesh_name}' initialized successfully") + + # Give connections a moment to fully stabilize + time.sleep(5) + logger.info(f"Host mesh '{mesh_name}' ready") + + host_meshes[mesh_name] = host_mesh + + return JobState(host_meshes) + + def can_run(self, spec: "JobTrait") -> bool: + """Check if this job can run the given spec.""" + if not isinstance(spec, SkyPilotJob): + return False + + return ( + spec._meshes == self._meshes + and spec._resources == self._resources + and spec._port == self._port + and self._jobs_active() + ) + + def _jobs_active(self) -> bool: + """Check if the SkyPilot cluster is still active.""" + if not self.active or not self._launched_cluster_name: + return False + + try: + request_id = sky.status(cluster_names=[self._launched_cluster_name]) + statuses = sky.get(request_id) + + if not statuses: + return False + + status = statuses[0] + # Check if cluster is UP + return status.status == sky.ClusterStatus.UP + except Exception as e: + logger.warning(f"Error checking cluster status: {e}") + return False + + def _kill(self) -> None: + """Tear down the SkyPilot cluster.""" + if self._launched_cluster_name is not None: + try: + logger.info(f"Tearing down SkyPilot cluster '{self._launched_cluster_name}'") + request_id = sky.down(self._launched_cluster_name) + sky.get(request_id) + logger.info(f"Cluster '{self._launched_cluster_name}' terminated") + except Exception as e: + logger.warning(f"Failed to tear down cluster: {e}") + + self._launched_cluster_name = None + self._node_ips.clear() + diff --git a/examples/skypilot/skypilot_ddp.ipynb b/examples/skypilot/skypilot_ddp.ipynb new file mode 100644 index 000000000..ddcb1b0b1 --- /dev/null +++ b/examples/skypilot/skypilot_ddp.ipynb @@ -0,0 +1,306 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Monarch DDP Example with SkyPilot\n", + "\n", + "This notebook demonstrates running PyTorch DDP (DistributedDataParallel) training on cloud infrastructure provisioned by SkyPilot.\n", + "\n", + "Adapted from the SLURM DDP example (`slurm_ddp.ipynb`).\n", + "\n", + "## Prerequisites\n", + "\n", + "```bash\n", + "pip install torchmonarch-nightly\n", + "pip install skypilot[kubernetes] # or skypilot[aws], skypilot[gcp], etc.\n", + "sky check # Verify SkyPilot configuration\n", + "```\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Imports and Setup\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "# Set timeouts before importing monarch\n", + "os.environ[\"HYPERACTOR_HOST_SPAWN_READY_TIMEOUT\"] = \"300s\"\n", + "os.environ[\"HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT\"] = \"300s\"\n", + "os.environ[\"HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE\"] = \"300s\"\n", + "\n", + "import torch\n", + "import torch.distributed as dist\n", + "import torch.nn as nn\n", + "import torch.optim as optim\n", + "\n", + "import sky\n", + "from monarch.actor import Actor, current_rank, endpoint\n", + "from monarch.utils import setup_env_for_distributed\n", + "from torch.nn.parallel import DistributedDataParallel as DDP\n", + "\n", + "# Import SkyPilotJob from local package\n", + "from monarch_skypilot import SkyPilotJob\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Define the Model and DDP Actor\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class ToyModel(nn.Module):\n", + " \"\"\"A simple toy model for demonstration purposes.\"\"\"\n", + "\n", + " def __init__(self):\n", + " super(ToyModel, self).__init__()\n", + " self.net1 = nn.Linear(10, 10)\n", + " self.relu = nn.ReLU()\n", + " self.net2 = nn.Linear(10, 5)\n", + "\n", + " def forward(self, x):\n", + " return self.net2(self.relu(self.net1(x)))\n", + "\n", + "\n", + "class DDPActor(Actor):\n", + " \"\"\"This Actor wraps the basic functionality from Torch's DDP example.\n", + "\n", + " Adapted from: https://docs.pytorch.org/tutorials/intermediate/ddp_tutorial.html#basic-use-case\n", + " \"\"\"\n", + "\n", + " def __init__(self):\n", + " self.rank = current_rank().rank\n", + "\n", + " @endpoint\n", + " async def setup(self) -> str:\n", + " \"\"\"Initialize the PyTorch distributed process group.\"\"\"\n", + " WORLD_SIZE = int(os.environ[\"WORLD_SIZE\"])\n", + " dist.init_process_group(\"gloo\", rank=self.rank, world_size=WORLD_SIZE)\n", + " return f\"Rank {self.rank}: Initialized distributed (world_size={WORLD_SIZE})\"\n", + "\n", + " @endpoint\n", + " async def cleanup(self) -> str:\n", + " \"\"\"Clean up the PyTorch distributed process group.\"\"\"\n", + " dist.destroy_process_group()\n", + " return f\"Rank {self.rank}: Cleaned up distributed\"\n", + "\n", + " @endpoint\n", + " async def demo_basic(self) -> str:\n", + " \"\"\"Run a basic DDP training example.\"\"\"\n", + " local_rank = int(os.environ[\"LOCAL_RANK\"])\n", + " model = ToyModel().to(local_rank)\n", + " ddp_model = DDP(model, device_ids=[local_rank])\n", + "\n", + " loss_fn = nn.MSELoss()\n", + " optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)\n", + "\n", + " optimizer.zero_grad()\n", + " outputs = ddp_model(torch.randn(20, 10))\n", + " labels = torch.randn(20, 5).to(local_rank)\n", + " loss = loss_fn(outputs, labels)\n", + " loss.backward()\n", + " optimizer.step()\n", + "\n", + " return f\"Rank {self.rank}: Training step complete (loss={loss.item():.4f})\"\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Configuration\n", + "\n", + "Configure your cloud provider, cluster size, and GPU type below:\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Configuration - modify these values as needed\n", + "CLOUD = \"kubernetes\" # Options: kubernetes, aws, gcp, azure\n", + "NUM_HOSTS = 2\n", + "GPUS_PER_HOST = 1\n", + "CLUSTER_NAME = \"monarch-ddp\"\n", + "ACCELERATOR = \"H200:1\" # e.g., H100:1, A100:1, V100:1\n", + "\n", + "def get_cloud(cloud_name: str):\n", + " \"\"\"Get SkyPilot cloud object from name.\"\"\"\n", + " clouds = {\n", + " \"kubernetes\": sky.Kubernetes,\n", + " \"aws\": sky.AWS,\n", + " \"gcp\": sky.GCP,\n", + " \"azure\": sky.Azure,\n", + " }\n", + " if cloud_name.lower() not in clouds:\n", + " raise ValueError(f\"Unknown cloud: {cloud_name}. Available: {list(clouds.keys())}\")\n", + " return clouds[cloud_name.lower()]()\n", + "\n", + "print(f\"Configuration:\")\n", + "print(f\" Cloud: {CLOUD}\")\n", + "print(f\" Hosts: {NUM_HOSTS}\")\n", + "print(f\" GPUs per host: {GPUS_PER_HOST}\")\n", + "print(f\" Accelerator: {ACCELERATOR}\")\n", + "print(f\" Cluster name: {CLUSTER_NAME}\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create SkyPilot Job\n", + "\n", + "Create a SkyPilot job to provision cloud instances:\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job = SkyPilotJob(\n", + " meshes={\"mesh0\": NUM_HOSTS},\n", + " resources=sky.Resources(\n", + " cloud=get_cloud(CLOUD),\n", + " accelerators=ACCELERATOR,\n", + " ),\n", + " cluster_name=CLUSTER_NAME,\n", + " idle_minutes_to_autostop=10,\n", + " down_on_autostop=True,\n", + ")\n", + "\n", + "print(f\"SkyPilot job created for cluster '{CLUSTER_NAME}'\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Launch Cluster and Create Process Mesh\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Launch the cluster and get the job state\n", + "print(\"Launching SkyPilot cluster...\")\n", + "job_state = job.state()\n", + "\n", + "# Create process mesh with GPUs\n", + "print(\"Creating process mesh...\")\n", + "proc_mesh = job_state.mesh0.spawn_procs({\"gpus\": GPUS_PER_HOST})\n", + "print(f\"Process mesh extent: {proc_mesh.extent}\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Spawn DDP Actors and Run Training\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Spawn DDP actors on the process mesh\n", + "print(\"Spawning DDP actors...\")\n", + "ddp_actor = proc_mesh.spawn(\"ddp_actor\", DDPActor)\n", + "\n", + "# Set up the distributed environment\n", + "print(\"Setting up distributed environment...\")\n", + "await setup_env_for_distributed(proc_mesh)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Run the DDP example\n", + "print(\"Running DDP training...\\n\")\n", + "\n", + "# Initialize distributed process group\n", + "print(\"[1] Initializing distributed process group...\")\n", + "results = await ddp_actor.setup.call()\n", + "for coord, msg in results:\n", + " print(f\" {msg}\")\n", + "\n", + "# Run the basic DDP training example\n", + "print(\"\\n[2] Running DDP training step...\")\n", + "results = await ddp_actor.demo_basic.call()\n", + "for coord, msg in results:\n", + " print(f\" {msg}\")\n", + "\n", + "# Clean up distributed process group\n", + "print(\"\\n[3] Cleaning up distributed process group...\")\n", + "results = await ddp_actor.cleanup.call()\n", + "for coord, msg in results:\n", + " print(f\" {msg}\")\n", + "\n", + "print(\"\\n\" + \"=\" * 60)\n", + "print(\"DDP example completed successfully!\")\n", + "print(\"=\" * 60)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Cleanup\n", + "\n", + "Tear down the SkyPilot cluster when done:\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Tear down the SkyPilot cluster\n", + "print(\"Cleaning up SkyPilot cluster...\")\n", + "job.kill()\n", + "print(\"Done!\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/skypilot/skypilot_quickstart_driver.py b/examples/skypilot/skypilot_quickstart_driver.py new file mode 100644 index 000000000..9319efcb6 --- /dev/null +++ b/examples/skypilot/skypilot_quickstart_driver.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python3 +""" +Running Monarch on Kubernetes with SkyPilot +=========================================== + +This script demonstrates running Monarch actors on cloud infrastructure +provisioned by SkyPilot (Kubernetes or cloud VMs). + +Prerequisites: + pip install torchmonarch-nightly + pip install skypilot[kubernetes] # or skypilot[aws], skypilot[gcp], etc. + sky check # Verify SkyPilot configuration + sky show-gpus --infra kubernetes # Verify GPUs available + +Usage: + # Run on Kubernetes with 2 nodes, 8 GPUs per node + python skypilot_quickstart_driver.py --cloud kubernetes --num-hosts 2 --gpus-per-host 8 --accelerator "H200:8" + + # Run on cloud VMs + python skypilot_quickstart_driver.py --cloud --num-hosts 2 --gpus-per-host 1 --accelerator "H100:1" + + # Run on CPU-only cluster (no GPUs) + python skypilot_quickstart_driver.py --cloud kubernetes --num-hosts 2 --gpus-per-host 0 --accelerator none +""" + +import argparse +import os +import sys + +# Set timeouts before importing monarch +os.environ["HYPERACTOR_HOST_SPAWN_READY_TIMEOUT"] = "300s" +os.environ["HYPERACTOR_MESSAGE_DELIVERY_TIMEOUT"] = "300s" +os.environ["HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE"] = "300s" + +# If running inside a SkyPilot cluster, unset the in-cluster context +# to allow launching new clusters on the same Kubernetes cluster +if "SKYPILOT_IN_CLUSTER_CONTEXT_NAME" in os.environ: + del os.environ["SKYPILOT_IN_CLUSTER_CONTEXT_NAME"] + +# Check dependencies before importing +try: + import sky +except ImportError: + print("ERROR: SkyPilot is not installed. Run: pip install skypilot[kubernetes]") + sys.exit(1) + +try: + from monarch.actor import Actor, endpoint, ProcMesh, context +except ImportError as e: + print(f"ERROR: Monarch is not properly installed: {e}") + print("Run: pip install torchmonarch-nightly") + sys.exit(1) + +# Import SkyPilotJob from the local package +from monarch_skypilot import SkyPilotJob + +# ============================================================================ +# Step 1: Define actors +# ============================================================================ + + +class Counter(Actor): + """A simple counter actor that demonstrates basic messaging.""" + + def __init__(self, initial_value: int = 0): + self.value = initial_value + + @endpoint + def increment(self) -> None: + self.value += 1 + + @endpoint + def get_value(self) -> int: + return self.value + + +class Trainer(Actor): + """A trainer actor that demonstrates distributed training patterns.""" + + @endpoint + def step(self) -> str: + my_point = context().message_rank + return f"Trainer {my_point} taking a step." + + @endpoint + def get_info(self) -> str: + rank = context().actor_instance.rank + return f"Trainer at rank {rank}" + + +# ============================================================================ +# Step 2: Create a SkyPilot Job to provision k8s pods/cloud VMs +# ============================================================================ + + +def get_cloud(cloud_name: str): + """Get SkyPilot cloud object from name.""" + clouds = { + "kubernetes": sky.Kubernetes, + "aws": sky.AWS, + "gcp": sky.GCP, + "azure": sky.Azure, + "nebius": sky.Nebius, + # "slurm": sky.Slurm, + # "ssh": sky.SSH, + # TODO(romilb): Add other clouds + } + if cloud_name.lower() not in clouds: + raise ValueError(f"Unknown cloud: {cloud_name}. Available: {list(clouds.keys())}") + return clouds[cloud_name.lower()]() + + +def main(): + parser = argparse.ArgumentParser(description="Monarch Getting Started with SkyPilot") + parser.add_argument( + "--cloud", + default="kubernetes", + help="Cloud provider to use (kubernetes, aws, gcp, azure, ssh)", + ) + parser.add_argument( + "--num-hosts", + type=int, + default=2, + help="Number of host nodes to provision", + ) + parser.add_argument( + "--gpus-per-host", + type=int, + default=1, + help="Number of GPU processes per host", + ) + parser.add_argument( + "--cluster-name", + default="monarch-getting-started", + help="Name for the SkyPilot cluster", + ) + parser.add_argument( + "--accelerator", + default="H200:1", + help="GPU accelerator to request (e.g., H100:1, A100:1, V100:1)", + ) + parser.add_argument( + "--region", + default=None, + help="Cloud region/Kubernetes context to use", + ) + args = parser.parse_args() + + # Determine if running in CPU-only mode + cpu_only = args.gpus_per_host == 0 or args.accelerator.lower() == "none" + + print("=" * 60) + print("Monarch Getting Started with SkyPilot") + print("=" * 60) + print(f"\nConfiguration:") + print(f" Cloud: {args.cloud}") + print(f" Hosts: {args.num_hosts}") + if cpu_only: + print(f" Mode: CPU-only (no GPUs)") + else: + print(f" GPUs per host: {args.gpus_per_host}") + print(f" Accelerator: {args.accelerator}") + print(f" Cluster name: {args.cluster_name}") + if args.region: + print(f" Region: {args.region}") + + # Create a SkyPilotJob to provision nodes + print("\n[1] Creating SkyPilot job...") + + # Build resources specification + resources_kwargs = { + "cloud": get_cloud(args.cloud), + } + # Only request GPUs if not in CPU-only mode + if not cpu_only: + resources_kwargs["accelerators"] = args.accelerator + if args.region: + resources_kwargs["region"] = args.region + + # Create a SkyPilotJob to provision nodes + job = SkyPilotJob( + # Define the mesh of hosts we need + meshes={"trainers": args.num_hosts}, + resources=sky.Resources(**resources_kwargs), + cluster_name=args.cluster_name, + # Auto-cleanup after 10 minutes of idle time (recommended for auto clean up if the job/controller fails) + idle_minutes_to_autostop=10, + down_on_autostop=True, + ) + + try: + # Get the job state - this launches the cluster and returns HostMeshes + print("\n[2] Launching cluster and starting Monarch workers...") + state = job.state() + + # Get our host mesh + hosts = state.trainers + print(f" Got host mesh with extent: {hosts.extent}") + + # ==================================================================== + # Step 3: Spawn processes and actors on the cloud hosts + # ==================================================================== + + print("\n[3] Spawning processes on cloud hosts...") + # Create a process mesh + if cpu_only: + # CPU-only mode: spawn 1 CPU process per host + procs: ProcMesh = hosts.spawn_procs(per_host={"procs": 1}) + else: + procs: ProcMesh = hosts.spawn_procs(per_host={"gpus": args.gpus_per_host}) + print(f" Process mesh extent: {procs.extent}") + + # Spawn counter actors + print("\n[4] Spawning Counter actors...") + counters: Counter = procs.spawn("counters", Counter, initial_value=0) + + # ==================================================================== + # Step 4: Interact with the actors + # ==================================================================== + + # Broadcast increment to all counters + print("\n[5] Broadcasting increment to all counters...") + counters.increment.broadcast() + counters.increment.broadcast() + counters.increment.broadcast() + + # Get all counter values + print("\n[6] Getting counter values...") + values = counters.get_value.call().get() + print(f" Counter values: {values}") + + # Spawn trainer actors + print("\n[7] Spawning Trainer actors...") + trainers: Trainer = procs.spawn("trainers", Trainer) + + # Do a training step + print("\n[8] Performing distributed training step...") + results = trainers.step.call().get() + for r in results: + print(f" {r}") + + # Get trainer info + print("\n[9] Getting trainer info...") + info = trainers.get_info.call().get() + for i in info: + print(f" {i}") + + print("\n" + "=" * 60) + print("Success! Monarch actors ran on SkyPilot cluster!") + print("=" * 60) + + except Exception as e: + print(f"\nERROR: {e}") + import traceback + traceback.print_exc() + print(f"\n[10] ERROR - not cleaning up cluster for debugging...") + print(f" You can debug with: sky ssh {args.cluster_name}") + print(f" To clean up later: sky down {args.cluster_name}") + raise + else: + # Clean up - tear down the SkyPilot cluster + print("\n[10] Cleaning up SkyPilot cluster...") + job.kill() + print(" Cluster terminated.") + + +if __name__ == "__main__": + main() +