diff --git a/codecarbon/core/resource_tracker.py b/codecarbon/core/resource_tracker.py index 120faf4ef..478b07e52 100644 --- a/codecarbon/core/resource_tracker.py +++ b/codecarbon/core/resource_tracker.py @@ -28,6 +28,7 @@ def set_RAM_tracking(self): self.ram_tracker = "RAM power estimation model" ram = RAM( tracking_mode=self.tracker._tracking_mode, + tracking_pids=self.tracker._tracking_pids, force_ram_power=self.tracker._force_ram_power, ) self.tracker._conf["ram_total_size"] = ram.machine_memory_GB @@ -46,6 +47,7 @@ def _setup_cpu_load_mode(self, tdp, max_power): model, max_power, tracking_mode=self.tracker._tracking_mode, + tracking_pids=self.tracker._tracking_pids, ) self.cpu_tracker = MODE_CPU_LOAD self.tracker._conf["cpu_model"] = hardware_cpu.get_model() @@ -141,6 +143,7 @@ def _setup_fallback_tracking(self, tdp, max_power): model, max_power, tracking_mode=self.tracker._tracking_mode, + tracking_pids=self.tracker._tracking_pids, ) self.cpu_tracker = MODE_CPU_LOAD else: @@ -163,6 +166,7 @@ def _setup_fallback_tracking(self, tdp, max_power): model, max_power, tracking_mode=self.tracker._tracking_mode, + tracking_pids=self.tracker._tracking_pids, ) self.cpu_tracker = MODE_CPU_LOAD else: diff --git a/codecarbon/emissions_tracker.py b/codecarbon/emissions_tracker.py index 2bb73cbd2..79e3dc0f4 100644 --- a/codecarbon/emissions_tracker.py +++ b/codecarbon/emissions_tracker.py @@ -176,6 +176,7 @@ def __init__( str ] = _sentinel, # Deprecated, use electricitymaps_api_token tracking_mode: Optional[str] = _sentinel, + tracking_pids: Optional[List[int]] = _sentinel, log_level: Optional[Union[int, str]] = _sentinel, on_csv_write: Optional[str] = _sentinel, logger_preamble: Optional[str] = _sentinel, @@ -231,6 +232,8 @@ def __init__( power consumption due to the entire machine or to try and isolate the tracked processe's in isolation. Defaults to "machine". + :param tracking_pids: PID of the process to be tracked when using "process" mode. + Defaults to None, which means the current process. :param log_level: Global codecarbon log level. Accepts one of: {"debug", "info", "warning", "error", "critical"}. Defaults to "info". @@ -314,6 +317,7 @@ def __init__( self._set_from_conf(prometheus_url, "prometheus_url", "localhost:9091") self._set_from_conf(output_handlers, "output_handlers", []) self._set_from_conf(tracking_mode, "tracking_mode", "machine") + self._set_from_conf(tracking_pids, "tracking_pids", None, int) self._set_from_conf(on_csv_write, "on_csv_write", "append") self._set_from_conf(logger_preamble, "logger_preamble", "") self._set_from_conf(force_cpu_power, "force_cpu_power", None, float) @@ -383,6 +387,12 @@ def __init__( else: logger.info(f" GPU model: {self._conf.get('gpu_model')}") + if self._tracking_mode == "process": + logger.info(" Tracking mode: process") + logger.info(" Tracked PIDs: " + str(self._tracking_pids)) + else: + logger.info(" Tracking mode: machine") + # Run `self._measure_power_and_energy` every `measure_power_secs` seconds in a # background thread self._scheduler = PeriodicScheduler( @@ -447,7 +457,12 @@ def _init_output_methods(self, *, api_key: str = None): self.run_id = uuid.uuid4() if self._save_to_prometheus: - self._output_handlers.append(PrometheusOutput(self._prometheus_url)) + self._output_handlers.append( + PrometheusOutput( + self._prometheus_url, + jobname=self._project_name + "_" + self._experiment_name, + ) + ) if self._save_to_logfire: self._output_handlers.append(LogfireOutput()) @@ -686,6 +701,10 @@ def stop(self) -> Optional[float]: self.final_emissions_data = emissions_data self.final_emissions = emissions_data.emissions + + for handler in self._output_handlers: + handler.exit() + return emissions_data.emissions def _persist_data( @@ -1163,6 +1182,7 @@ def track_emissions( str ] = _sentinel, # Deprecated, use electricitymaps_api_token tracking_mode: Optional[str] = _sentinel, + tracking_pids: Optional[List[int]] = _sentinel, log_level: Optional[Union[int, str]] = _sentinel, on_csv_write: Optional[str] = _sentinel, logger_preamble: Optional[str] = _sentinel, @@ -1223,6 +1243,8 @@ def track_emissions( power consumption due to the entire machine or to try and isolate the tracked processe's in isolation. Defaults to "machine". + :param tracking_pids: PID of the process to be tracked when using "process" mode. + Defaults to None, which means the current process. :param log_level: Global codecarbon log level. Accepts one of: {"debug", "info", "warning", "error", "critical"}. Defaults to "info". @@ -1297,6 +1319,7 @@ def wrapped_fn(*args, **kwargs): gpu_ids=gpu_ids, electricitymaps_api_token=_electricitymaps_token, tracking_mode=tracking_mode, + tracking_pids=tracking_pids, log_level=log_level, on_csv_write=on_csv_write, logger_preamble=logger_preamble, @@ -1336,6 +1359,7 @@ def wrapped_fn(*args, **kwargs): experiment_name=experiment_name, electricitymaps_api_token=_electricitymaps_token, tracking_mode=tracking_mode, + tracking_pids=tracking_pids, log_level=log_level, on_csv_write=on_csv_write, logger_preamble=logger_preamble, diff --git a/codecarbon/external/hardware.py b/codecarbon/external/hardware.py index 30ce125ee..38dd9b0c4 100644 --- a/codecarbon/external/hardware.py +++ b/codecarbon/external/hardware.py @@ -168,6 +168,7 @@ def __init__( tdp: int, rapl_dir: str = "/sys/class/powercap/intel-rapl/subsystem", tracking_mode: str = "machine", + tracking_pids: int = None, rapl_include_dram: bool = False, rapl_prefer_psys: bool = False, ): @@ -179,9 +180,17 @@ def __init__( self._tdp = tdp self._is_generic_tdp = False self._tracking_mode = tracking_mode - self._pid = psutil.Process().pid + self._tracking_pids = tracking_pids self._cpu_count = count_cpus() - self._process = psutil.Process(self._pid) + + if tracking_pids is not None: + # Make list if it is not already a list + if not isinstance(tracking_pids, list): + self._tracking_pids = [tracking_pids] + else: + self._tracking_pids = tracking_pids + else: + self._tracking_pids = [psutil.Process().pid] if self._mode == "intel_power_gadget": self._intel_interface = IntelPowerGadget(self._output_dir) @@ -246,11 +255,43 @@ def _get_power_from_cpu_load(self): ) elif self._tracking_mode == "process": - cpu_load = self._process.cpu_percent(interval=0.5) / self._cpu_count + cpu_load = 0 + + for pid in self._tracking_pids: + if not psutil.pid_exists(pid): + # Log a warning and continue + logger.warning(f"Process with pid {pid} does not exist anymore.") + continue + self._process = psutil.Process(pid) + cpu_load += self._process.cpu_percent(interval=0.5) + + try: + children = self._process.children(recursive=True) + for child in children: + try: + # Use interval=0.0 for children to avoid blocking + child_cpu = child.cpu_percent(interval=0.0) + logger.info(f"Child {child.pid} CPU: {child_cpu}") + cpu_load += child_cpu + except ( + psutil.NoSuchProcess, + psutil.AccessDenied, + psutil.ZombieProcess, + ): + # Child process may have terminated or we don't have access + continue + except (psutil.NoSuchProcess, psutil.AccessDenied): + # Main process terminated or access denied + pass + + # Normalize by CPU count + logger.info(f"Total CPU load (all processes): {cpu_load}") + cpu_load = cpu_load / self._cpu_count power = self._tdp * cpu_load / 100 logger.debug( - f"CPU load {self._tdp} W and {cpu_load * 100:.1f}% => estimation of {power} W for process {self._pid}." + f"CPU load {self._tdp} W and {cpu_load * 100:.1f}% => estimation of {power} W for processes {self._tracking_pids} (including children)." ) + else: raise Exception(f"Unknown tracking_mode {self._tracking_mode}") return Power.from_watts(power) @@ -337,6 +378,7 @@ def from_utils( model: Optional[str] = None, tdp: Optional[int] = None, tracking_mode: str = "machine", + tracking_pids: int = None, rapl_include_dram: bool = False, rapl_prefer_psys: bool = False, ) -> "CPU": @@ -364,6 +406,7 @@ def from_utils( model=model, tdp=tdp, tracking_mode=tracking_mode, + tracking_pids=tracking_pids, rapl_include_dram=rapl_include_dram, rapl_prefer_psys=rapl_prefer_psys, ) diff --git a/codecarbon/external/ram.py b/codecarbon/external/ram.py index 20a5c2fad..6a24758ea 100644 --- a/codecarbon/external/ram.py +++ b/codecarbon/external/ram.py @@ -1,8 +1,9 @@ import math import re import subprocess +import traceback from dataclasses import dataclass -from typing import Optional +from typing import List, Optional import psutil @@ -34,9 +35,9 @@ class RAM(BaseHardware): def __init__( self, - pid: int = psutil.Process().pid, children: bool = True, tracking_mode: str = "machine", + tracking_pids: Optional[List[int]] = None, force_ram_power: Optional[int] = None, ): """ @@ -45,19 +46,24 @@ def __init__( is True. Args: - pid (int, optional): Process id (with respect to which we'll look for - children). Defaults to psutil.Process().pid. children (int, optional): Look for children of the process when computing total RAM used. Defaults to True. tracking_mode (str, optional): Whether to track "machine" or "process" RAM. Defaults to "machine". + tracking_pids ([int], optional): Process id to track RAM usage for "process" + tracking_mode. Defaults to None. force_ram_power (int, optional): User-provided RAM power in watts. If provided, this value is used instead of estimating RAM power. Defaults to None. """ - self._pid = pid self._children = children self._tracking_mode = tracking_mode + + if tracking_mode == "process" and tracking_pids is None: + self._tracking_pids = [psutil.Process().pid] + else: + self._tracking_pids = tracking_pids + self._force_ram_power = force_ram_power # Check if using ARM architecture self.is_arm_cpu = self._detect_arm_cpu() @@ -192,16 +198,21 @@ def _calculate_ram_power(self, memory_gb: float) -> float: # Apply minimum power constraint return max(min_power, total_power) - def _get_children_memories(self): + def _get_children_memories(self, pid: int): """ Compute the used RAM by the process's children Returns: list(int): The list of RAM values """ - current_process = psutil.Process(self._pid) + memorie_consumption = dict() + current_process = psutil.Process(pid) + children = current_process.children(recursive=True) - return [child.memory_info().rss for child in children] + for child in children: + memorie_consumption[child.pid] = child.memory_info().rss + + return memorie_consumption def _read_slurm_scontrol(self): try: @@ -285,17 +296,35 @@ def slurm_memory_GB(self): return mem @property - def process_memory_GB(self): + def process_memory_GB(self) -> float: """ Property to compute the process's total memory usage in bytes. Returns: float: RAM usage (GB) """ - children_memories = self._get_children_memories() if self._children else [] - main_memory = psutil.Process(self._pid).memory_info().rss - memories = children_memories + [main_memory] - return sum([m for m in memories if m] + [0]) / B_TO_GB + + # Store memory usage in dict to avoid double counting + total_memory = dict() + + for pid in self._tracking_pids: + if not psutil.pid_exists(pid): + logger.warning(f"Process with pid {pid} does not exist anymore.") + continue + + # Own memory + total_memory[pid] = psutil.Process(pid).memory_info().rss + + # Children's memory + children_memories = self._get_children_memories(pid) + for child_pid, mem in children_memories.items(): + total_memory[child_pid] = mem + + # Reduce to total memory + total_memory = sum(total_memory.values()) + logger.debug(f"Process total memory usage: {total_memory / B_TO_GB:.2f} GB") + + return total_memory / B_TO_GB @property def machine_memory_GB(self): @@ -338,6 +367,7 @@ def total_power(self) -> Power: ) except Exception as e: logger.warning(f"Could not measure RAM Power ({str(e)})") + logger.warning(traceback.format_exc()) ram_power = Power.from_watts(0) return ram_power diff --git a/codecarbon/output_methods/base_output.py b/codecarbon/output_methods/base_output.py index 4b152c29b..ff6ea1778 100644 --- a/codecarbon/output_methods/base_output.py +++ b/codecarbon/output_methods/base_output.py @@ -22,3 +22,6 @@ def live_out(self, total: EmissionsData, delta: EmissionsData): def task_out(self, data: List[TaskEmissionsData], experiment_name: str): pass + + def exit(self): + pass diff --git a/codecarbon/output_methods/metrics/metric_docs.py b/codecarbon/output_methods/metrics/metric_docs.py index 864641ee8..d62d48f71 100644 --- a/codecarbon/output_methods/metrics/metric_docs.py +++ b/codecarbon/output_methods/metrics/metric_docs.py @@ -50,17 +50,22 @@ class MetricDocumentation: ) cpu_energy_doc = MetricDocumentation( "codecarbon_cpu_energy", - description="Energy used per CPU (kWh)", + description="Energy used per CPU since last reading (kWh)", ) gpu_energy_doc = MetricDocumentation( "codecarbon_gpu_energy", - description="Energy used per GPU (kWh)", + description="Energy used per GPU since last reading (kWh)", ) ram_energy_doc = MetricDocumentation( "codecarbon_ram_energy", - description="Energy used per RAM (kWh)", + description="Energy used per RAM since last reading (kWh)", ) energy_consumed_doc = MetricDocumentation( "codecarbon_energy_consumed", - description="Sum of cpu_energy, gpu_energy and ram_energy (kW)", + description="Sum of cpu_energy, gpu_energy and ram_energy (kWh)", +) + +energy_consumed_total_doc = MetricDocumentation( + "codecarbon_energy_total", + description="Accumulated cpu_energy, gpu_energy and ram_energy (kWh)", ) diff --git a/codecarbon/output_methods/metrics/prometheus.py b/codecarbon/output_methods/metrics/prometheus.py index 24318c6b8..b09d5f0bc 100644 --- a/codecarbon/output_methods/metrics/prometheus.py +++ b/codecarbon/output_methods/metrics/prometheus.py @@ -1,7 +1,13 @@ import dataclasses import os -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway +from prometheus_client import ( + CollectorRegistry, + Counter, + Gauge, + delete_from_gateway, + push_to_gateway, +) from prometheus_client.exposition import basic_auth_handler from codecarbon.external.logger import logger @@ -15,6 +21,7 @@ emissions_doc, emissions_rate_doc, energy_consumed_doc, + energy_consumed_total_doc, gpu_energy_doc, gpu_power_doc, ram_energy_doc, @@ -35,6 +42,8 @@ # TODO: Set up the possible labels labelnames = [ "project_name", + "experiment_id", + "experiment_name", "country_name", "country_iso_code", "region", @@ -60,6 +69,15 @@ def generate_gauge(metric_doc: MetricDocumentation): ) +def generate_counter(metric_doc: MetricDocumentation): + return Counter( + metric_doc.name, + metric_doc.description, + labelnames, + registry=registry, + ) + + duration_gauge = generate_gauge(duration_doc) emissions_gauge = generate_gauge(emissions_doc) emissions_rate_gauge = generate_gauge(emissions_rate_doc) @@ -70,6 +88,7 @@ def generate_gauge(metric_doc: MetricDocumentation): gpu_energy_gauge = generate_gauge(gpu_energy_doc) ram_energy_gauge = generate_gauge(ram_energy_doc) energy_consumed_gauge = generate_gauge(energy_consumed_doc) +energy_consumed_total = generate_counter(energy_consumed_total_doc) class PrometheusOutput(BaseOutput): @@ -77,8 +96,18 @@ class PrometheusOutput(BaseOutput): Send emissions data to prometheus pushgateway """ - def __init__(self, prometheus_url: str): + def __init__(self, prometheus_url: str, jobname: str = "codecarbon"): self.prometheus_url = prometheus_url + self.jobname = jobname + + def exit(self): + # Cleanup metrics from pushgateway on shutdown, prometheus should already have scraped them + # Otherwise they will persist with their last values + try: + logger.info("Deleting metrics from Prometheus Pushgateway") + delete_from_gateway(self.prometheus_url, job=self.jobname) + except Exception as e: + logger.error(e, exc_info=True) def out(self, total: EmissionsData, delta: EmissionsData): try: @@ -121,10 +150,14 @@ def add_emission(self, carbon_emission: dict): ]: gauge.labels(**labels).set(carbon_emission[emission_name]) + # Update the total energy consumed counter + # + energy_consumed_total.labels(**labels).inc(carbon_emission["energy_consumed"]) + # Send the new metric values push_to_gateway( self.prometheus_url, - job="codecarbon", + job=self.jobname, registry=registry, handler=self._auth_handler, ) diff --git a/docs/_sources/output.rst.txt b/docs/_sources/output.rst.txt index 2a6a4a369..478a32aca 100644 --- a/docs/_sources/output.rst.txt +++ b/docs/_sources/output.rst.txt @@ -30,11 +30,11 @@ input parameter (defaults to the current directory), for each experiment tracked * - emissions_rate - emissions divided per duration, in Kg/s * - cpu_power - - CPU power (W) + - Mean CPU power (W) * - gpu_power - - GPU power (W) + - Mean GPU power (W) * - ram_power - - RAM power (W) + - Mean RAM power (W) * - cpu_energy - Energy used per CPU (kWh) * - gpu_energy @@ -130,7 +130,7 @@ Logfire Using CodeCarbon with logfire ````````````````````````````````` -`Logfire `_ is an observability platform. +`Logfire `_ is an observability platform. CodeCarbon exposes all its metrics with the suffix `codecarbon_`. diff --git a/docs/_sources/parameters.rst.txt b/docs/_sources/parameters.rst.txt index cf7884c88..26b17f896 100644 --- a/docs/_sources/parameters.rst.txt +++ b/docs/_sources/parameters.rst.txt @@ -24,6 +24,9 @@ Input Parameters * - tracking_mode - | ``machine`` measure the power consumptions of the entire machine (default) | ``process`` try and isolate the tracked processes in isolation + * - tracking_pids + - | List of PIDs to track when using ``process`` tracking mode, + | defaults to ``None``, which tracks the current process * - gpu_ids - | Comma-separated list of GPU ids to track, defaults to ``None`` | These can either be integer indexes of GPUs on the system, or prefixes diff --git a/docs/edit/parameters.rst b/docs/edit/parameters.rst index cf7884c88..26b17f896 100644 --- a/docs/edit/parameters.rst +++ b/docs/edit/parameters.rst @@ -24,6 +24,9 @@ Input Parameters * - tracking_mode - | ``machine`` measure the power consumptions of the entire machine (default) | ``process`` try and isolate the tracked processes in isolation + * - tracking_pids + - | List of PIDs to track when using ``process`` tracking mode, + | defaults to ``None``, which tracks the current process * - gpu_ids - | Comma-separated list of GPU ids to track, defaults to ``None`` | These can either be integer indexes of GPUs on the system, or prefixes diff --git a/examples/slurm_logging.py b/examples/slurm_logging.py new file mode 100755 index 000000000..6ebdcb693 --- /dev/null +++ b/examples/slurm_logging.py @@ -0,0 +1,207 @@ +#!/root/.venv/codecarbon/bin/python3 + +import argparse +import logging +import os +import subprocess as sp +import sys +import time +import traceback + +import psutil + +from codecarbon import OfflineEmissionsTracker + + +def _print_process_tree(proc, indent=0): + prefix = " " * indent + try: + name = proc.name() + pid = proc.pid + except psutil.NoSuchProcess: + return + + log_message(f"{prefix}{name} (pid {pid})\n") + + # Children + for child in proc.children(recursive=False): + _print_process_tree(child, indent + 4) + + +def print_process_tree(pid=os.getpid()): + current = psutil.Process(pid) + log_message("\n=== Parent Tree ===\n") + p = current + stack = [] + while p is not None: + stack.append(p) + p = p.parent() + + # Print ancestors from root → current + for proc in reversed(stack): + log_message(f"{proc.name()} (pid {proc.pid})\n") + log_message("\n=== Children Tree ===\n") + _print_process_tree(current) + + +def query_slurm_pids(jobid): + + try: + sp_output = sp.check_output( + ["/usr/local/bin/scontrol", "listpids", str(jobid)], stderr=sp.STDOUT + ) + log_message(f"scontrol output:\n{sp_output.decode()}") + except sp.CalledProcessError as e: + log_message(f"scontrol failed for job {jobid}\n") + log_message(f"Return code: {e.returncode}\n") + log_message(f"Output:\n{e.output.decode(errors='replace')}\n") + return [] + except Exception as e: + # Catch-all for other failures + log_message(f"Unexpected error calling scontrol: {e}\n") + return [] + + pids = [] + lines = sp_output.decode().strip().splitlines() + for line in lines[1:]: # Skip the first line + parts = line.split() + if not parts: + continue + + pid = parts[0] + # skip invalid PIDs + if pid in ("-1", "-"): + continue + + try: + pids.append(int(pid)) + except ValueError: + # In case pid is something unexpected + continue + + return pids + + +def log_message(message): + print(message) + if logfile is not None: + logfile.write(message + "\n") + logfile.flush() + + +def build_argument_parser(): + parser = argparse.ArgumentParser(description="CodeCarbon job wrapper") + group_ids = parser.add_mutually_exclusive_group(required=True) + group_ids.add_argument( + "--jobid", type=int, required=False, default=None, help="SLURM Job ID" + ) + group_ids.add_argument( + "--pids", type=int, nargs="+", required=False, default=[], help="Process ID" + ) + + parser.add_argument("--user", type=str, required=True, help="SLURM Job User") + parser.add_argument( + "--gpuids", + type=str, + required=False, + help="Comma-separated GPU IDs assigned to the job", + ) + return parser + + +################################################################### + +# Loglevel debug +logging.basicConfig(level=logging.DEBUG) + +logfile = None +try: + parser = build_argument_parser() + args = parser.parse_args() + + jobid = args.jobid + pids = args.pids + + user = args.user + if args.gpuids: + gpuids = args.gpuids.split(",") + else: + gpuids = [] + + os.environ["SLURM_JOB_ID"] = str(jobid) + os.environ["SLURM_JOB_USER"] = str(user) + os.environ["SLURM_JOB_GPUS"] = ",".join(gpuids) + + logfile = open(f"/tmp/cc_{jobid}.log", "w", buffering=1) + log_message("Python started") + log_message(f"Interpreter: {sys.executable}") + + log_message("CodeCarbon SLURM Prolog Script Started") + log_message(f"Time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}") + + log_message("Available environment variables:") + for key, value in os.environ.items(): + log_message(f"{key}: {value}") + + log_message("Wait 60 seconds to allow job processes to start") + for i in range(60): + log_message(f" Waiting... {1 * i} seconds elapsed") + time.sleep(1) # Give some time for the job to start properly + + log_message( + "Wait completed at " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ) + + if jobid is None: + log_message(f"Using provided PIDs: {pids}") + else: + log_message('Parse scontrol for process IDs with "scontrol listpids"') + pids = query_slurm_pids(jobid) + + log_message(f"Found PIDs: {pids}") + + for pid in pids: + log_message(f"Process tree for PID {pid}:") + print_process_tree(pid) + + log_message(f"Job ID: {jobid}, User: {user}, GPU IDs: {gpuids}") + + tracker = OfflineEmissionsTracker( + country_iso_code="DEU", + region="DE-NW", + measure_power_secs=10, + api_call_interval=2, + gpu_ids=f"{gpuids}", + tracking_mode="process", + tracking_pids=args.jobid, + save_to_prometheus=True, + prometheus_url="129.217.31.239:9091", + project_name=f"{user}", + experiment_name=f"{jobid}", + output_dir="/tmp/codecarbon_log/", + output_file="/tmp/codecarbon_log/emission.csv", + ) + + tracker.start() + + # Check for termination signal every second + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + log_message("Received termination signal. Stopping CodeCarbon tracker...") + except Exception as e: + log_message(f"Exception in tracking loop: {e}") + raise e + finally: + tracker.stop() + log_message("CodeCarbon tracker stopped.") + +except Exception: + log_message("Exception occurred:") + log_message(traceback.format_exc()) + +finally: + if logfile is not None: + log_message("CodeCarbon SLURM Prolog Script Ended") + logfile.close() diff --git a/examples/slurm_prolog.sh b/examples/slurm_prolog.sh new file mode 100755 index 000000000..eaa24859c --- /dev/null +++ b/examples/slurm_prolog.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +JOBID=$SLURM_JOB_ID +LOGFILE="/tmp/prolog_${JOBID}.log" +PIDFILE="/tmp/prolog_${JOBID}.pid" + +echo "Starting CodeCarbon for job $JOBID" >> "$LOGFILE" + +mkdir -p /tmp/codecarbon_log/ + +# Check if GPU IDs are available +if [ -z "$SLURM_JOB_GPUS" ]; then + # We cannot inherit the cgroup because slum kills the entire cgroup on end of initialization + systemd-run --unit codecarbon_$JOBID \ + /etc/slurm/pyscripts/_codecarbon.py \ + --jobid $JOBID \ + --user $SLURM_JOB_USER \ + &>> "$LOGFILE" +else + systemd-run --unit codecarbon_$JOBID \ + /etc/slurm/pyscripts/_codecarbon.py \ + --jobid $JOBID \ + --user $SLURM_JOB_USER \ + --gpuids $SLURM_JOB_GPUS \ + &>> "$LOGFILE" +fi + +# Save PID for epilog +sleep 1 +exit 0 + + diff --git a/tests/test_pid_tracking.py b/tests/test_pid_tracking.py new file mode 100644 index 000000000..14fa204dd --- /dev/null +++ b/tests/test_pid_tracking.py @@ -0,0 +1,85 @@ +import os +import subprocess as sp +import tempfile +import time +import unittest + +from codecarbon.emissions_tracker import OfflineEmissionsTracker + +python_load_code = """ +import math +i = 0 +erg = 0 +while True: + i += 1 + a = math.sqrt(64*64*64*64*64) + erg += a +print(erg) +""" + + +class TestPIDTracking(unittest.TestCase): + def setUp(self) -> None: + self.project_name = "project_TestPIDTracking" + self.emissions_file = "emissions-test-TestPIDTracking" + self.emissions_path = tempfile.gettempdir() + self.emissions_file_path = os.path.join( + self.emissions_path, self.emissions_file + ) + if os.path.isfile(self.emissions_file_path): + os.remove(self.emissions_file_path) + + self.pids = [] + self.process = [] + for _ in range(128): + self.process.append(sp.Popen(["python", "-c", python_load_code])) + self.pids.append(self.process[-1].pid) + self.pids.append(os.getpid()) + + def tearDown(self) -> None: + if os.path.isfile(self.emissions_file_path): + os.remove(self.emissions_file_path) + + for proc in self.process: + proc.terminate() + proc.wait() + + def test_carbon_pid_tracking_offline(self): + + # Subprocess PIDs are children, therefore both should be equal + tracker_pid = OfflineEmissionsTracker( + output_dir=self.emissions_path, + output_file=self.emissions_file + "_pid.csv", + tracking_mode="process", + tracking_pids=self.pids, + ) + tracker_self = OfflineEmissionsTracker( + output_dir=self.emissions_path, + output_file=self.emissions_file + "_global.csv", + tracking_mode="process", + tracking_pids=[os.getpid()], + ) + + tracker_pid.start() + tracker_self.start() + + time.sleep(5) + + emissions_pid = tracker_pid.stop() + emissions_self = tracker_self.stop() + + print(f"Emissions (self): {emissions_self} kgCO2eq") + print(f"Emissions (pid): {emissions_pid} kgCO2eq") + + if not isinstance(emissions_pid, float): + print(emissions_pid) + assert isinstance(emissions_pid, float) + + self.assertNotEqual(emissions_pid, 0.0) + + # Compare emissions from both trackers, should be less than 10% difference + diff = abs(emissions_pid - emissions_self) + avg = (emissions_pid + emissions_self) / 2 + percent_diff = (diff / avg) * 100 + print(f"Percent difference: {percent_diff}%") + self.assertLessEqual(percent_diff, 10.0)