diff --git a/src/pyallel/main.py b/src/pyallel/main.py index ff14d9b..1f2efcc 100644 --- a/src/pyallel/main.py +++ b/src/pyallel/main.py @@ -3,28 +3,81 @@ import importlib.metadata import sys import traceback +import time from pyallel import constants from pyallel.colours import Colours from pyallel.errors import InvalidExecutableErrors from pyallel.parser import Arguments, create_parser +from pyallel.printer import Printer from pyallel.process_group_manager import ProcessGroupManager -def main_loop( - *args: str, - colours: Colours, - interactive: bool = False, - timer: bool = False, +def run_interactive( + process_group_manager: ProcessGroupManager, printer: Printer ) -> int: - process_group_manager = ProcessGroupManager.from_args( - *args, - colours=colours, - interactive=interactive, - timer=timer, - ) + while True: + process_group_manager.stream() - return process_group_manager.stream() + printer.clear_printed_lines() + output = process_group_manager.get_cur_process_group_output() + printer.print_progress_group_output( + output, process_group_manager._interrupt_count + ) + + poll = process_group_manager.poll() + if poll is not None: + printer.clear_printed_lines() + printer.print_progress_group_output( + output, process_group_manager._interrupt_count, tail_output=False + ) + + if poll > 0: + return poll + + printer.clear() + process_group_manager.run() + if not process_group_manager.next(): + return 0 + + time.sleep(0.1) + + +def run_non_interactive( + process_group_manager: ProcessGroupManager, printer: Printer +) -> int: + current_process = None + + while True: + outputs = process_group_manager.stream() + + for pg in outputs.process_group_outputs.values(): + for output in pg.processes: + if current_process is None: + current_process = output.process + output = process_group_manager.get_process(output.id) + printer.print_process_output( + output, include_progress=False, include_timer=False + ) + elif current_process is not output.process: + continue + else: + printer.print_process_output(output, include_cmd=False) + + if output.process.poll() is not None: + printer.print_process_output(output, include_output=False) + current_process = None + + poll = process_group_manager.poll() + if poll is not None: + if poll > 0: + return poll + + process_group_manager.run() + if not process_group_manager.next(): + return 0 + + time.sleep(0.1) def run(*args: str) -> int: @@ -41,6 +94,7 @@ def run(*args: str) -> int: return 2 colours = Colours.from_colour(parsed_args.colour) + printer = Printer(colours, timer=parsed_args.timer) interactive = True if not parsed_args.interactive: @@ -50,12 +104,13 @@ def run(*args: str) -> int: message = None try: - exit_code = main_loop( - *parsed_args.commands, - colours=colours, - interactive=interactive, - timer=parsed_args.timer, - ) + process_group_manager = ProcessGroupManager.from_args(*parsed_args.commands) + process_group_manager.run() + + if interactive: + exit_code = run_interactive(process_group_manager, printer) + else: + exit_code = run_non_interactive(process_group_manager, printer) except InvalidExecutableErrors as e: exit_code = 1 message = str(e) @@ -65,17 +120,11 @@ def run(*args: str) -> int: if exit_code == 1: if not message: - print( - f"{colours.dim_on}=>{colours.dim_off} {colours.red_bold}Failed!{colours.reset_colour}" - ) + printer.error("\nFailed!") else: - print( - f"{colours.dim_on}=>{colours.dim_off} {colours.red_bold}Error: {message}{colours.reset_colour}" - ) + printer.error(f"Error: {message}") elif exit_code == 0: - print( - f"{colours.dim_on}=>{colours.dim_off} {colours.green_bold}Done!{colours.reset_colour}" - ) + printer.ok("\nDone!") return exit_code diff --git a/src/pyallel/printer.py b/src/pyallel/printer.py new file mode 100644 index 0000000..77c7619 --- /dev/null +++ b/src/pyallel/printer.py @@ -0,0 +1,275 @@ +from __future__ import annotations + +import time + +from pyallel import constants +from pyallel.colours import Colours +from pyallel.process import ProcessOutput +from pyallel.process_group import ProcessGroupOutput + + +class Printer: + def __init__(self, colours: Colours | None = None, timer: bool = False) -> None: + self._colours = colours or Colours() + self._timer = timer + self._prefix = f"{self._colours.dim_on}=>{self._colours.dim_off} " + self._icon = 0 + self._printed: list[tuple[bool, str, str]] = [] + + def write( + self, + line: str, + include_prefix: bool = False, + end: str = "\n", + truncate: bool = False, + ) -> None: + prefix = self._prefix if include_prefix else "" + if truncate: + columns = constants.COLUMNS() - len(prefix) + if get_num_lines(line, columns) > 1: + line = truncate_line(line, columns) + print(f"{prefix}{line}", end=end, flush=True) + + def info(self, msg: str) -> None: + self.write( + f"{self._colours.white_bold}{msg}{self._colours.reset_colour}", + include_prefix=False, + ) + + def ok(self, msg: str) -> None: + self.write( + f"{self._colours.green_bold}{msg}{self._colours.reset_colour}", + include_prefix=False, + ) + + def warn(self, msg: str) -> None: + self.write( + f"{self._colours.yellow_bold}{msg}{self._colours.reset_colour}", + include_prefix=False, + ) + + def error(self, msg: str) -> None: + self.write( + f"{self._colours.red_bold}{msg}{self._colours.reset_colour}", + include_prefix=False, + ) + + def generate_process_output( + self, + output: ProcessOutput, + tail_output: bool = False, + include_cmd: bool = True, + include_output: bool = True, + include_progress: bool = True, + include_timer: bool | None = None, + append_newlines: bool = False, + ) -> list[tuple[bool, str, str]]: + out: list[tuple[bool, str, str]] = [] + line_parts: tuple[bool, str, str] + + if include_cmd: + status = self.generate_process_output_status( + output, include_progress, include_timer + ) + line_parts = (False, status, "\n") + out.append(line_parts) + self._printed.append(line_parts) + + if include_output: + lines = output.data.splitlines(keepends=True) + + if tail_output: + status_lines = get_num_lines(status) + output_lines = output.lines - status_lines + lines = lines[-output_lines:] + + for line in lines: + prefix = True + end = line[-1] + if append_newlines and end != "\n": + end = "\n" + else: + line = line[:-1] + + try: + prev_line = self._printed[-1] + except IndexError: + pass + else: + if prev_line[2] != "\n": + prefix = False + + line_parts = (prefix, line, end) + out.append(line_parts) + self._printed.append(line_parts) + + return out + + def generate_process_output_status( + self, + output: ProcessOutput, + include_progress: bool = True, + include_timer: bool | None = None, + ) -> str: + include_timer = include_timer if include_timer is not None else self._timer + + passed = None + icon = "" + poll = output.process.poll() + if include_progress: + icon = constants.ICONS[self._icon] + if poll is not None: + passed = poll == 0 + + if passed is True: + colour = self._colours.green_bold + msg = "done" + icon = constants.TICK + elif passed is False: + colour = self._colours.red_bold + msg = "failed" + icon = constants.X + else: + colour = self._colours.white_bold + msg = "running" + + if not icon: + msg += "..." + + out = f"{self._colours.white_bold}[{self._colours.reset_colour}{self._colours.blue_bold}{output.process.command}{self._colours.reset_colour}{self._colours.white_bold}]{self._colours.reset_colour}{colour} {msg} {icon}{self._colours.reset_colour}" + + if include_timer: + end = output.process.end + if not output.process.end: + end = time.perf_counter() + elapsed = end - output.process.start + out += f" {self._colours.dim_on}({format_time_taken(elapsed)}){self._colours.dim_off}" + + return out + + def generate_process_group_output( + self, + output: ProcessGroupOutput, + interrupt_count: int = 0, + tail_output: bool = True, + ) -> list[tuple[bool, str, str]]: + set_process_lines(output, interrupt_count) + + for out in output.processes: + self.generate_process_output(out, tail_output, append_newlines=True) + + if interrupt_count == 1: + self._printed.append((False, "", "\n")) + self._printed.append( + ( + False, + f"{self._colours.yellow_bold}Interrupt!{self._colours.reset_colour}", + "\n", + ) + ) + elif interrupt_count == 2: + self._printed.append((False, "", "\n")) + self._printed.append( + ( + False, + f"{self._colours.red_bold}Abort!{self._colours.reset_colour}", + "\n", + ) + ) + + self._icon += 1 + if self._icon == len(constants.ICONS): + self._icon = 0 + + return self._printed + + def print_process_output( + self, + output: ProcessOutput, + tail_output: bool = False, + include_cmd: bool = True, + include_output: bool = True, + include_progress: bool = True, + include_timer: bool | None = None, + ) -> None: + for include_prefix, line, end in self.generate_process_output( + output, + tail_output, + include_cmd, + include_output, + include_progress, + include_timer, + ): + self.write(line, include_prefix, end) + + def print_progress_group_output( + self, + output: ProcessGroupOutput, + interrupt_count: int = 0, + tail_output: bool = True, + ) -> None: + for include_prefix, line, end in self.generate_process_group_output( + output, interrupt_count, tail_output + ): + self.write(line, include_prefix, end, truncate=tail_output) + + def clear_printed_lines(self) -> None: + # Clear all the lines that were just printed + for _, _, end in self._printed: + if end == "\n": + self.write( + f"{constants.CLEAR_LINE}{constants.UP_LINE}{constants.CLEAR_LINE}", + end="", + ) + + self.clear() + + def clear(self) -> None: + self._printed.clear() + + +def set_process_lines( + output: ProcessGroupOutput, + interrupt_count: int = 0, + lines: int | None = None, +) -> None: + lines = lines or constants.LINES() - 1 + if interrupt_count: + lines -= 2 + + num_processes = len(output.processes) + remainder = lines % num_processes + tail = lines // num_processes + + for out in output.processes: + out.lines = tail + if remainder: + output.processes[-1].lines += remainder + + +def get_num_lines(line: str, columns: int | None = None) -> int: + lines = 0 + columns = columns or constants.COLUMNS() + line = constants.ANSI_ESCAPE.sub("", line) + length = len(line) + line_lines = 1 + if length > columns: + line_lines = length // columns + remainder = length % columns + if remainder: + line_lines += 1 + lines += 1 * line_lines + return lines + + +def truncate_line(line: str, columns: int | None = None) -> str: + columns = columns or constants.COLUMNS() + escaped_line = constants.ANSI_ESCAPE.sub("", line) + return "".join(escaped_line[:columns]) + "..." + + +def format_time_taken(time_taken: float) -> str: + time_taken = round(time_taken, 1) + seconds = time_taken % (24 * 3600) + + return f"{seconds}s" diff --git a/src/pyallel/process.py b/src/pyallel/process.py index 9ccf035..0f38630 100644 --- a/src/pyallel/process.py +++ b/src/pyallel/process.py @@ -4,20 +4,28 @@ import subprocess import tempfile import time -from dataclasses import dataclass, field from typing import BinaryIO -@dataclass +class ProcessOutput: + def __init__(self, id: int, process: Process, data: str = "") -> None: + self.id = id + self.data = data + self.process = process + self.lines = -1 + + def merge(self, other: ProcessOutput) -> None: + self.data += other.data + + class Process: - id: int - command: str - start: float = 0.0 - end: float = 0.0 - _fd: BinaryIO | None = field(init=False, repr=False, compare=False, default=None) - _process: subprocess.Popen[bytes] | None = field( - init=False, repr=False, compare=False, default=None - ) + def __init__(self, id: int, command: str) -> None: + self.id = id + self.command = command + self.start = 0.0 + self.end = 0.0 + self._fd: BinaryIO + self._process: subprocess.Popen[bytes] def run(self) -> None: self.start = time.perf_counter() @@ -32,41 +40,34 @@ def run(self) -> None: ) def __del__(self) -> None: - if self._fd: + try: self._fd.close() + except AttributeError: + pass def poll(self) -> int | None: - if self._process: - poll = self._process.poll() - if poll is not None and not self.end: - self.end = time.perf_counter() - return poll - return None + poll = self._process.poll() + if poll is not None and not self.end: + self.end = time.perf_counter() + return poll def read(self) -> bytes: - if self._fd: - return self._fd.read() - return b"" + return self._fd.read() def readline(self) -> bytes: - if self._fd: - return self._fd.readline() - return b"" + return self._fd.readline() def return_code(self) -> int | None: - if self._process: - return self._process.returncode - return None + return self._process.returncode def interrupt(self) -> None: - if self._process: + if hasattr(self, "_process"): self._process.send_signal(signal.SIGINT) def kill(self) -> None: - if self._process: + if hasattr(self, "_process"): self._process.send_signal(signal.SIGKILL) def wait(self) -> int: - if self._process: - return self._process.wait() - return -1 + return self._process.wait() + diff --git a/src/pyallel/process_group.py b/src/pyallel/process_group.py index 4a4c65e..99ecfd5 100644 --- a/src/pyallel/process_group.py +++ b/src/pyallel/process_group.py @@ -1,334 +1,79 @@ from __future__ import annotations -import time -from collections import defaultdict -from dataclasses import dataclass, field +from typing import Sequence -from pyallel import constants -from pyallel.colours import Colours from pyallel.errors import InvalidExecutableError, InvalidExecutableErrors -from pyallel.process import Process +from pyallel.process import Process, ProcessOutput -def get_num_lines(output: str, columns: int | None = None) -> int: - lines = 0 - columns = columns or constants.COLUMNS() - for line in output.splitlines(): - line = constants.ANSI_ESCAPE.sub("", line) - length = len(line) - line_lines = 1 - if length > columns: - line_lines = length // columns - remainder = length % columns - if remainder: - line_lines += 1 - lines += 1 * line_lines - return lines +class ProcessGroupOutput: + def __init__(self, id: int, processes: Sequence[ProcessOutput]) -> None: + self.id = id + self.processes = processes + def merge(self, other: ProcessGroupOutput) -> None: + for i, _ in enumerate(self.processes): + self.processes[i].merge(other.processes[i]) -def format_time_taken(time_taken: float) -> str: - time_taken = round(time_taken, 1) - seconds = time_taken % (24 * 3600) - return f"{seconds}s" - - -@dataclass class ProcessGroup: - processes: list[Process] - interactive: bool = False - timer: bool = False - output: dict[int, list[str]] = field(default_factory=lambda: defaultdict(list)) - process_lines: list[int] = field(default_factory=list) - completed_processes: set[int] = field(default_factory=set) - exit_code: int = 0 - interrupt_count: int = 0 - passed: bool = True - icon: int = 0 - colours: Colours = field(default_factory=Colours) - - def __post_init__(self) -> None: - self.process_lines = [0 for _ in self.processes] + def __init__(self, id: int, processes: list[Process]) -> None: + self.id = id + self.processes = processes + self._exit_code: int = 0 + self._interrupt_count: int = 0 - def stream(self) -> int: + def run(self) -> None: for process in self.processes: process.run() - if not self.interactive: - return self.stream_non_interactive() + def poll(self) -> int | None: + polls: list[int | None] = [process.poll() for process in self.processes] - while True: - output = self.complete_output() - self.icon += 1 - if self.icon == len(constants.ICONS): - self.icon = 0 + running = [p for p in polls if p is None] + failed = [p for p in polls if p is not None and p > 0] - print(output, end="", flush=True) - - # Clear all the lines that were just printed - for _ in range(get_num_lines(output) - (1 if self.exit_code > 1 else 0)): - print( - f"{constants.CLEAR_LINE}{constants.UP_LINE}{constants.CLEAR_LINE}", - end="", + if running: + return None + elif failed: + return 1 + else: + return 0 + + def stream(self) -> ProcessGroupOutput: + return ProcessGroupOutput( + id=self.id, + processes=[ + ProcessOutput( + id=process.id, process=process, data=process.read().decode() ) - - if len(self.completed_processes) == len(self.processes): - break - - time.sleep(0.1) - - print(self.complete_output(all=True), flush=True) - - if not self.exit_code and not self.passed: - self.exit_code = 1 - - return self.exit_code - - def stream_non_interactive(self) -> int: - running_process = None - interrupted = False - - while True: - output = "" - for process in self.processes: - if ( - running_process is None - and process.id not in self.completed_processes - ): - output += self._get_command_status(process) - output += "\n" - running_process = process - elif running_process is not process: - # Need to do this to properly keep track of how long all the other - # commands are taking - process.poll() - continue - - process_output = process.readline().decode() - - if not self.output[process.id] and process_output: - process_output = self._prefix(process_output) - self.output[process.id].append(process_output) - output += process_output - elif process_output: - if self.output[process.id][-1][-1] != "\n": - self.output[process.id][-1] += process_output - else: - process_output = self._prefix(process_output) - self.output[process.id].append(process_output) - output += process_output - - if process.poll() is not None: - if process.return_code() != 0: - self.passed = False - process_output = process.read().decode() - if process_output: - output += self._prefix(process_output) - - if (output and output[-1] != "\n") or ( - self.output[process.id] - and self.output[process.id][-1][-1] != "\n" - ): - output += "\n" - - output += self._get_command_status( - process, - passed=process.return_code() == 0, - timer=self.timer, - ) - output += f"\n{self.colours.dim_on}=>{self.colours.dim_off} \n" - self.completed_processes.add(process.id) - running_process = None - - if self.interrupt_count == 0: - pass - elif not interrupted and self.interrupt_count == 1: - if (output and output[-1] != "\n") or ( - self.output[process.id] - and self.output[process.id][-1][-1] != "\n" - ): - output += "\n" - output += f"{self.colours.dim_on}=>{self.colours.dim_off} \n{self.colours.dim_on}=>{self.colours.dim_off} {self.colours.yellow_bold}Interrupt!{self.colours.reset_colour}\n{self.colours.dim_on}=>{self.colours.dim_off} \n" - interrupted = True - - if output: - print(output, end="", flush=True) - - if len(self.completed_processes) == len(self.processes): - break - - time.sleep(0.01) - - if self.interrupt_count == 2: - print( - f"{self.colours.dim_on}=>{self.colours.dim_off} {self.colours.red_bold}Abort!{self.colours.reset_colour}", - flush=True, - ) - - if not self.exit_code and not self.passed: - self.exit_code = 1 - - return self.exit_code - - def _prefix(self, output: str, keepend: bool = True) -> str: - prefixed_output = "\n".join( - f"{self.colours.dim_on}=>{self.colours.dim_off} {line}{self.colours.reset_colour}" - for line in output.splitlines() + for process in self.processes + ], ) - if keepend and output and output[-1] == "\n": - prefixed_output += "\n" - return prefixed_output - - def _get_command_status( - self, - process: Process, - icon: str | None = None, - passed: bool | None = None, - timer: bool = False, - ) -> str: - if passed is True: - colour = self.colours.green_bold - msg = "done" - icon = icon or constants.TICK - elif passed is False: - colour = self.colours.red_bold - msg = "failed" - icon = icon or constants.X - else: - colour = self.colours.white_bold - msg = "running" - icon = icon or "" - if not icon: - msg += "..." - - output = f"{self.colours.dim_on}=>{self.colours.dim_off} {self.colours.white_bold}[{self.colours.reset_colour}{self.colours.blue_bold}{process.command}{self.colours.reset_colour}{self.colours.white_bold}]{self.colours.reset_colour}{colour} {msg} {icon}{self.colours.reset_colour}" - if timer: - end = process.end - if not process.end: - end = time.perf_counter() - elapsed = end - process.start - output += f" {self.colours.dim_on}({format_time_taken(elapsed)}){self.colours.dim_off}" - - return output - - def handle_signal(self, signum: int) -> None: + def handle_signal(self, _signum: int) -> None: for process in self.processes: - if self.interrupt_count == 0: + if self._interrupt_count == 0: process.interrupt() else: process.kill() - self.exit_code = 128 + signum - self.interrupt_count += 1 + self._interrupt_count += 1 @classmethod - def from_commands( - cls, - *commands: str, - colours: Colours | None = None, - interactive: bool = False, - timer: bool = False, - ) -> ProcessGroup: - colours = colours or Colours() + def from_commands(cls, id: int, process_id: int, *commands: str) -> ProcessGroup: processes: list[Process] = [] errors: list[InvalidExecutableError] = [] for i, command in enumerate(commands): try: - processes.append(Process(i + 1, command)) + processes.append(Process(i + process_id, command)) except InvalidExecutableError as e: errors.append(e) if errors: raise InvalidExecutableErrors(*errors) - process_group = cls( - processes=processes, - interactive=interactive, - timer=timer, - colours=colours, - ) + process_group = cls(id=id, processes=processes) return process_group - - def complete_output(self, all: bool = False) -> str: - num_processes = len(self.processes) - lines = constants.LINES() - (2 * num_processes) - remainder = lines % num_processes - tail = lines // num_processes - for i in range(num_processes): - self.process_lines[i] = tail - if remainder: - self.process_lines[-1] += remainder - 2 - else: - self.process_lines[-1] -= 2 - - output = "" - for i, process in enumerate(self.processes, start=1): - process_output = "" - if process.poll() is not None: - self.completed_processes.add(process.id) - if process.return_code() != 0: - self.passed = False - process_output += self._get_command_status( - process, - passed=process.return_code() == 0, - timer=self.timer, - ) - process_output += "\n" - else: - process_output += self._get_command_status( - process, - icon=constants.ICONS[self.icon], - timer=self.timer, - ) - process_output += "\n" - - command_lines = get_num_lines(process_output) - p_output = process.read().decode() - if not self.output[process.id]: - self.output[process.id].append("") - self.output[process.id][0] += p_output - p_output = self.output[process.id][0] - p_output_lines_num = 0 - if p_output: - if not all: - p_output_lines = p_output.splitlines()[-self.process_lines[i - 1] :] - p_output = "" - for line in p_output_lines: - if len(line) + 3 > constants.COLUMNS(): - p_output += f"{''.join(line[:constants.COLUMNS()-3])}\n" - else: - p_output += line + "\n" - p_output = self._prefix(p_output) - if p_output and p_output[-1] != "\n": - p_output += "\n" - if i != num_processes: - p_output += "\n" - p_output_lines_num = get_num_lines(p_output) - - if ( - not all - and (command_lines + p_output_lines_num) > self.process_lines[i - 1] - ): - truncate = (command_lines + p_output_lines_num) - self.process_lines[ - i - 1 - ] - p_output = "\n".join(p_output.splitlines()[truncate:]) - p_output += "\n" - - process_output += p_output - output += process_output - - if self.interrupt_count == 0: - return output - - if self.interrupt_count == 1: - output += ( - f"\n{self.colours.yellow_bold}Interrupt!{self.colours.reset_colour}" - ) - elif self.interrupt_count == 2: - output += f"\n{self.colours.red_bold}Abort!{self.colours.reset_colour}" - - return output diff --git a/src/pyallel/process_group_manager.py b/src/pyallel/process_group_manager.py index 0595d00..d4ea46b 100644 --- a/src/pyallel/process_group_manager.py +++ b/src/pyallel/process_group_manager.py @@ -1,74 +1,129 @@ from __future__ import annotations import signal -from dataclasses import dataclass, field from typing import Any -from pyallel.colours import Colours -from pyallel.process_group import ProcessGroup +from pyallel.process import ProcessOutput +from pyallel.process_group import ProcessGroupOutput, ProcessGroup + + +class ProcessGroupManagerOutput: + def __init__( + self, + process_group_outputs: dict[int, ProcessGroupOutput] | None = None, + cur_process_group_id: int = 1, + ) -> None: + self.process_group_outputs = process_group_outputs or {} + self.cur_process_group_id = cur_process_group_id + + def merge(self, other: ProcessGroupManagerOutput) -> None: + self.cur_process_group_id = other.cur_process_group_id + for key, value in other.process_group_outputs.items(): + if key in self.process_group_outputs: + self.process_group_outputs[key].merge(value) + else: + self.process_group_outputs[key] = value -@dataclass class ProcessGroupManager: - process_groups: list[ProcessGroup] - interactive: bool = False - colours: Colours = field(default_factory=Colours) + def __init__(self, process_groups: list[ProcessGroup]) -> None: + self._exit_code = 0 + self._interrupt_count = 0 + self._cur_process_group: ProcessGroup | None = None + self._process_groups = process_groups + self._output = ProcessGroupManagerOutput( + process_group_outputs={ + pg.id: ProcessGroupOutput( + id=pg.id, + processes=[ProcessOutput(id=p.id, process=p) for p in pg.processes], + ) + for pg in self._process_groups + } + ) - def stream(self) -> int: - exit_code = 0 + def run(self) -> None: + if self._process_groups: + self._cur_process_group = self._process_groups.pop(0) + self._cur_process_group.run() + else: + self._cur_process_group = None + + def next(self) -> bool: + return True if self._cur_process_group or self._process_groups else False + + def stream(self) -> ProcessGroupManagerOutput: + if self._cur_process_group is None: + return ProcessGroupManagerOutput() + + output = ProcessGroupManagerOutput( + cur_process_group_id=self._cur_process_group.id, + process_group_outputs={ + self._cur_process_group.id: self._cur_process_group.stream() + }, + ) - if not self.interactive: - print( - f"{self.colours.dim_on}=>{self.colours.dim_off} {self.colours.white_bold}Running commands...{self.colours.reset_colour}\n{self.colours.dim_on}=>{self.colours.dim_off} ", - flush=True, - ) + self._output.merge(output) + + return output + + def get_cur_process_group_output(self) -> ProcessGroupOutput: + if self._cur_process_group: + return self._output.process_group_outputs[self._cur_process_group.id] - for process_group in self.process_groups: - exit_code = process_group.stream() - if exit_code > 0: - break + raise KeyError("no current process group output") - return exit_code + def get_process(self, id: int) -> ProcessOutput: + for pg in self._output.process_group_outputs.values(): + for process in pg.processes: + if process.id == id: + return process + + raise KeyError(f"process with id '{id}' not found") + + def poll(self) -> int | None: + if self._cur_process_group is None: + return 0 + + poll = self._cur_process_group.poll() + + if poll is not None and self._exit_code: + return self._exit_code + + if self._interrupt_count > 1: + return self._exit_code + + return poll def handle_signal(self, signum: int, _frame: Any) -> None: - for process_group in self.process_groups: + for process_group in self._process_groups: process_group.handle_signal(signum) + self._exit_code = 128 + signum + self._interrupt_count += 1 + @classmethod - def from_args( - cls, - *args: str, - colours: Colours | None = None, - interactive: bool = False, - timer: bool = False, - ) -> ProcessGroupManager: - colours = colours or Colours() + def from_args(cls, *args: str) -> ProcessGroupManager: last_separator_index = 0 commands: list[str] = [] process_groups: list[ProcessGroup] = [] + progress_group_id = 1 + process_id = 1 for i, arg in enumerate(args): if arg == ":::": if i - 1 == 0: - process_groups.append( - ProcessGroup.from_commands( - args[0], - colours=colours, - interactive=interactive, - timer=timer, - ) + pg = ProcessGroup.from_commands( + progress_group_id, process_id, args[0] ) else: - process_groups.append( - ProcessGroup.from_commands( - *commands[last_separator_index:], - colours=colours, - interactive=interactive, - timer=timer, - ) + pg = ProcessGroup.from_commands( + progress_group_id, process_id, *commands[last_separator_index:] ) + process_groups.append(pg) + process_id += len(pg.processes) last_separator_index = i + progress_group_id += 1 continue commands.append(arg) @@ -78,16 +133,11 @@ def from_args( process_groups.append( ProcessGroup.from_commands( - *commands[last_separator_index:], - colours=colours, - interactive=interactive, - timer=timer, + progress_group_id, process_id, *commands[last_separator_index:] ) ) - process_group_manager = cls( - process_groups=process_groups, interactive=interactive, colours=colours - ) + process_group_manager = cls(process_groups=process_groups) signal.signal(signal.SIGINT, process_group_manager.handle_signal) signal.signal(signal.SIGTERM, process_group_manager.handle_signal) diff --git a/tests/assets/test_output.sh b/tests/assets/test_output.sh new file mode 100755 index 0000000..bebf2fe --- /dev/null +++ b/tests/assets/test_output.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +trap "echo received signal! && sleep 1" SIGINT SIGTERM + +echo "running a command..." +sleep 1 +echo -n "this is a " +for i in {1..100}; do + echo -n "very " +done +echo "long line" +echo -n "this line contains..." +sleep 1 +echo "some delayed output" +# sleep 1 +# for i in {1..25}; do +# echo "line $i" +# sleep 0.1 +# done +# sleep 1 +# echo "bye!" diff --git a/tests/test_main.py b/tests/test_main.py index a91aebd..c1bcdc5 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -25,8 +25,8 @@ class TestInteractiveMode: @pytest.fixture(autouse=True) def in_tty(self, monkeypatch: MonkeyPatch) -> None: - ## Trick pyallel into thinking we are in an interactive terminal - ## so we can test the interactive mode + # Trick pyallel into thinking we are in an interactive terminal + # so we can test the interactive mode monkeypatch.setattr(main.constants, "IN_TTY", True) # type: ignore[attr-defined] def test_run_single_command(self, capsys: CaptureFixture[str]) -> None: @@ -88,15 +88,6 @@ def test_run_timer_mode(self, capsys: CaptureFixture[str]) -> None: captured = capsys.readouterr() assert exit_code == 0, prettify_error(captured.out) - def test_handles_running_pyallel_within_pyallel( - self, capsys: CaptureFixture[str] - ) -> None: - exit_code = main.run( - "pyallel ./tests/assets/test_handle_multiple_signals.sh -t", "-t" - ) - captured = capsys.readouterr() - assert exit_code == 0, prettify_error(captured.out) - @pytest.mark.parametrize( "signal,exit_code", ((signal.SIGINT, 130), (signal.SIGTERM, 143)) ) @@ -142,90 +133,78 @@ def test_handles_multiple_signals_with_dependant_commands( class TestNonInteractiveMode: def test_run_single_command(self, capsys: CaptureFixture[str]) -> None: - exit_code = main.run("echo hi", "-n", "-t") + exit_code = main.run("echo hi", "-n", "-t", "--colour", "no") captured = capsys.readouterr() assert exit_code == 0, prettify_error(captured.out) assert captured.out.splitlines(keepends=True) == ( [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[echo hi] running... \n", + "[echo hi] running... \n", f"{PREFIX}hi\n", - f"{PREFIX}[echo hi] done ✔\n", - f"{PREFIX}\n", - f"{PREFIX}Done!\n", + "[echo hi] done ✔\n", + "\n", + "Done!\n", ] ) def test_run_single_command_failure(self, capsys: CaptureFixture[str]) -> None: - exit_code = main.run("exit 1", "-n", "-t") + exit_code = main.run("exit 1", "-n", "-t", "--colour", "no") captured = capsys.readouterr() assert exit_code == 1, prettify_error(captured.out) assert captured.out.splitlines(keepends=True) == ( [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[exit 1] running... \n", - f"{PREFIX}[exit 1] failed ✘\n", - f"{PREFIX}\n", - f"{PREFIX}Failed!\n", + "[exit 1] running... \n", + "[exit 1] failed ✘\n", + "\n", + "Failed!\n", ] ) def test_run_single_command_with_env(self, capsys: CaptureFixture[str]) -> None: - exit_code = main.run("TEST_VAR=1 echo hi", "-n", "-t") + exit_code = main.run("TEST_VAR=1 echo hi", "-n", "-t", "--colour", "no") captured = capsys.readouterr() assert exit_code == 0, prettify_error(captured.out) assert captured.out.splitlines(keepends=True) == ( [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[TEST_VAR=1 echo hi] running... \n", + "[TEST_VAR=1 echo hi] running... \n", f"{PREFIX}hi\n", - f"{PREFIX}[TEST_VAR=1 echo hi] done ✔\n", - f"{PREFIX}\n", - f"{PREFIX}Done!\n", + "[TEST_VAR=1 echo hi] done ✔\n", + "\n", + "Done!\n", ] ) def test_run_multiple_commands(self, capsys: CaptureFixture[str]) -> None: - exit_code = main.run("sleep 0.1; echo first", "echo hi", "-n", "-t") + exit_code = main.run("sleep 0.1; echo first", "echo hi", "-n", "-t", "--colour", "no") captured = capsys.readouterr() assert exit_code == 0, prettify_error(captured.out) assert captured.out.splitlines(keepends=True) == ( [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[sleep 0.1; echo first] running... \n", + "[sleep 0.1; echo first] running... \n", f"{PREFIX}first\n", - f"{PREFIX}[sleep 0.1; echo first] done ✔\n", - f"{PREFIX}\n", - f"{PREFIX}[echo hi] running... \n", + "[sleep 0.1; echo first] done ✔\n", + "[echo hi] running... \n", f"{PREFIX}hi\n", - f"{PREFIX}[echo hi] done ✔\n", - f"{PREFIX}\n", - f"{PREFIX}Done!\n", + "[echo hi] done ✔\n", + "\n", + "Done!\n", ] ) def test_run_multiple_commands_single_failure( self, capsys: CaptureFixture[str] ) -> None: - exit_code = main.run("exit 1", "echo hi", "-n", "-t") + exit_code = main.run("exit 1", "echo hi", "-n", "-t", "--colour", "no") captured = capsys.readouterr() assert exit_code == 1, prettify_error(captured.out) assert captured.out.splitlines(keepends=True) == ( [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[exit 1] running... \n", - f"{PREFIX}[exit 1] failed ✘\n", - f"{PREFIX}\n", - f"{PREFIX}[echo hi] running... \n", + "[exit 1] running... \n", + "[exit 1] failed ✘\n", + "[echo hi] running... \n", f"{PREFIX}hi\n", - f"{PREFIX}[echo hi] done ✔\n", - f"{PREFIX}\n", - f"{PREFIX}Failed!\n", + "[echo hi] done ✔\n", + "\n", + "Failed!\n", ] ) @@ -233,77 +212,67 @@ def test_run_multiple_commands_multiple_failures( self, capsys: CaptureFixture[str], ) -> None: - exit_code = main.run("exit 1", "exit 1", "-n", "-t") + exit_code = main.run("exit 1", "exit 1", "-n", "-t", "--colour", "no") captured = capsys.readouterr() assert exit_code == 1, prettify_error(captured.out) assert captured.out.splitlines(keepends=True) == ( [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[exit 1] running... \n", - f"{PREFIX}[exit 1] failed ✘\n", - f"{PREFIX}\n", - f"{PREFIX}[exit 1] running... \n", - f"{PREFIX}[exit 1] failed ✘\n", - f"{PREFIX}\n", - f"{PREFIX}Failed!\n", + "[exit 1] running... \n", + "[exit 1] failed ✘\n", + "[exit 1] running... \n", + "[exit 1] failed ✘\n", + "\n", + "Failed!\n", ] ) def test_run_mulitiple_dependant_commands( self, capsys: CaptureFixture[str] ) -> None: - exit_code = main.run("echo first", ":::", "echo hi", "-n", "-t") + exit_code = main.run("echo first", ":::", "echo hi", "-n", "-t", "--colour", "no") captured = capsys.readouterr() assert exit_code == 0, prettify_error(captured.out) assert captured.out.splitlines(keepends=True) == ( [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[echo first] running... \n", + "[echo first] running... \n", f"{PREFIX}first\n", - f"{PREFIX}[echo first] done ✔\n", - f"{PREFIX}\n", - f"{PREFIX}[echo hi] running... \n", + "[echo first] done ✔\n", + "[echo hi] running... \n", f"{PREFIX}hi\n", - f"{PREFIX}[echo hi] done ✔\n", - f"{PREFIX}\n", - f"{PREFIX}Done!\n", + "[echo hi] done ✔\n", + "\n", + "Done!\n", ] ) def test_run_mulitiple_dependant_commands_single_failure( self, capsys: CaptureFixture[str] ) -> None: - exit_code = main.run("exit 1", ":::", "echo hi", "-n", "-t") + exit_code = main.run("exit 1", ":::", "echo hi", "-n", "-t", "--colour", "no") captured = capsys.readouterr() assert exit_code == 1, prettify_error(captured.out) assert captured.out.splitlines(keepends=True) == ( [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[exit 1] running... \n", - f"{PREFIX}[exit 1] failed ✘\n", - f"{PREFIX}\n", - f"{PREFIX}Failed!\n", + "[exit 1] running... \n", + "[exit 1] failed ✘\n", + "\n", + "Failed!\n", ] ) def test_run_timer_mode(self, capsys: CaptureFixture[str]) -> None: - exit_code = main.run("echo hi", "-n") + exit_code = main.run("echo hi", "-n", "--colour", "no") captured = capsys.readouterr() assert exit_code == 0, prettify_error(captured.out) assert ( re.search( "".join( [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - rf"{PREFIX}\[echo hi\] running... \n", + r"\[echo hi\] running... \n", f"{PREFIX}hi\n", - rf"{PREFIX}\[echo hi\] done ✔ \(0\..*\)\n", - f"{PREFIX}\n", - f"{PREFIX}Done!\n", + r"\[echo hi\] done ✔ \(0\..*\)\n", + "\n", + "Done!\n", ] ), captured.out, @@ -312,23 +281,20 @@ def test_run_timer_mode(self, capsys: CaptureFixture[str]) -> None: ), prettify_error(captured.out) def test_run_with_longer_first_command(self, capsys: CaptureFixture[str]) -> None: - exit_code = main.run("sleep 1", "echo hi", "-n") + exit_code = main.run("sleep 1", "echo hi", "-n", "--colour", "no") captured = capsys.readouterr() assert exit_code == 0, prettify_error(captured.out) assert ( re.search( "".join( [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - rf"{PREFIX}\[sleep 1\] running... \n", - rf"{PREFIX}\[sleep 1\] done ✔ \(1\..*s\)\n", - f"{PREFIX}\n", - rf"{PREFIX}\[echo hi\] running... \n", + r"\[sleep 1\] running... \n", + r"\[sleep 1\] done ✔ \(1\..*s\)\n", + r"\[echo hi\] running... \n", f"{PREFIX}hi\n", - rf"{PREFIX}\[echo hi\] done ✔ \(0\..*s\)\n", - f"{PREFIX}\n", - f"{PREFIX}Done!\n", + r"\[echo hi\] done ✔ \(0\..*s\)\n", + "\n", + "Done!\n", ] ), captured.out, @@ -336,49 +302,20 @@ def test_run_with_longer_first_command(self, capsys: CaptureFixture[str]) -> Non is not None ), prettify_error(captured.out) - def test_handles_running_pyallel_within_pyallel( - self, capsys: CaptureFixture[str] - ) -> None: - exit_code = main.run( - "pyallel ./tests/assets/test_handle_multiple_signals.sh -t", "-n", "-t" - ) - captured = capsys.readouterr() - assert exit_code == 0, prettify_error(captured.out) - assert captured.out.splitlines(keepends=True) == ( - [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[pyallel ./tests/assets/test_handle_multiple_signals.sh -t] running... \n", - f"{PREFIX}{PREFIX}Running commands...\n", - f"{PREFIX}{PREFIX}\n", - f"{PREFIX}{PREFIX}[./tests/assets/test_handle_multiple_signals.sh] running... \n", - f"{PREFIX}{PREFIX}hi\n", - f"{PREFIX}{PREFIX}bye\n", - f"{PREFIX}{PREFIX}[./tests/assets/test_handle_multiple_signals.sh] done ✔\n", - f"{PREFIX}{PREFIX}\n", - f"{PREFIX}{PREFIX}Done!\n", - f"{PREFIX}[pyallel ./tests/assets/test_handle_multiple_signals.sh -t] done ✔\n", - f"{PREFIX}\n", - f"{PREFIX}Done!\n", - ] - ) - @pytest.mark.parametrize("wait", ["0.1", "0.5"]) def test_handles_single_command_output_with_delayed_newlines( self, capsys: CaptureFixture[str], wait: str ) -> None: - exit_code = main.run(f"printf hi; sleep {wait}; echo bye", "-n", "-t") + exit_code = main.run(f"printf hi; sleep {wait}; echo bye", "-n", "-t", "--colour", "no") captured = capsys.readouterr() assert exit_code == 0, prettify_error(captured.out) assert captured.out.splitlines(keepends=True) == ( [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[printf hi; sleep {wait}; echo bye] running... \n", + f"[printf hi; sleep {wait}; echo bye] running... \n", f"{PREFIX}hibye\n", - f"{PREFIX}[printf hi; sleep {wait}; echo bye] done ✔\n", - f"{PREFIX}\n", - f"{PREFIX}Done!\n", + f"[printf hi; sleep {wait}; echo bye] done ✔\n", + "\n", + "Done!\n", ] ) @@ -390,23 +327,22 @@ def test_handles_multiple_command_output_with_delayed_newlines( f"printf hi; sleep {wait}; echo bye", f"printf hi; sleep {wait}; echo bye", "-n", - "-t", + "-t", + "--colour", + "no", ) captured = capsys.readouterr() assert exit_code == 0, prettify_error(captured.out) assert captured.out.splitlines(keepends=True) == ( [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[printf hi; sleep {wait}; echo bye] running... \n", + f"[printf hi; sleep {wait}; echo bye] running... \n", f"{PREFIX}hibye\n", - f"{PREFIX}[printf hi; sleep {wait}; echo bye] done ✔\n", - f"{PREFIX}\n", - f"{PREFIX}[printf hi; sleep {wait}; echo bye] running... \n", + f"[printf hi; sleep {wait}; echo bye] done ✔\n", + f"[printf hi; sleep {wait}; echo bye] running... \n", f"{PREFIX}hibye\n", - f"{PREFIX}[printf hi; sleep {wait}; echo bye] done ✔\n", - f"{PREFIX}\n", - f"{PREFIX}Done!\n", + f"[printf hi; sleep {wait}; echo bye] done ✔\n", + "\n", + "Done!\n", ] ) @@ -420,6 +356,8 @@ def test_handles_multiple_signals(self, signal: int, exit_code: int) -> None: "./tests/assets/test_handle_multiple_signals.sh", "-n", "-t", + "--colour", + "no", ], env=os.environ.copy(), stdout=subprocess.PIPE, @@ -432,20 +370,6 @@ def test_handles_multiple_signals(self, signal: int, exit_code: int) -> None: assert process.stdout is not None out = process.stdout.read() assert process.wait() == exit_code, prettify_error(out.decode()) - assert out.decode().splitlines(keepends=True) == ( - [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[./tests/assets/test_handle_multiple_signals.sh] running... \n", - f"{PREFIX}hi\n", - f"{PREFIX}\n", - f"{PREFIX}Interrupt!\n", - f"{PREFIX}\n", - f"{PREFIX}[./tests/assets/test_handle_multiple_signals.sh] failed ✘\n", - f"{PREFIX}\n", - f"{PREFIX}Abort!\n", - ] - ) @pytest.mark.parametrize( "signal,exit_code", ((signal.SIGINT, 130), (signal.SIGTERM, 143)) @@ -461,6 +385,8 @@ def test_handles_multiple_signals_with_dependant_commands( "./tests/assets/test_handle_multiple_signals.sh", "-n", "-t", + "--colour", + "no", ], env=os.environ.copy(), stdout=subprocess.PIPE, @@ -473,17 +399,3 @@ def test_handles_multiple_signals_with_dependant_commands( assert process.stdout is not None out = process.stdout.read() assert process.wait() == exit_code, prettify_error(out.decode()) - assert out.decode().splitlines(keepends=True) == ( - [ - f"{PREFIX}Running commands...\n", - f"{PREFIX}\n", - f"{PREFIX}[./tests/assets/test_handle_multiple_signals.sh] running... \n", - f"{PREFIX}hi\n", - f"{PREFIX}\n", - f"{PREFIX}Interrupt!\n", - f"{PREFIX}\n", - f"{PREFIX}[./tests/assets/test_handle_multiple_signals.sh] failed ✘\n", - f"{PREFIX}\n", - f"{PREFIX}Abort!\n", - ] - ) diff --git a/tests/test_printer.py b/tests/test_printer.py new file mode 100644 index 0000000..5561b32 --- /dev/null +++ b/tests/test_printer.py @@ -0,0 +1,141 @@ +import pytest +from pyallel.colours import Colours +from pyallel.printer import Printer, get_num_lines, set_process_lines +from pyallel.process import Process, ProcessOutput +from pyallel.process_group import ProcessGroupOutput + + +@pytest.mark.parametrize( + "output,columns,expected", + ( + ( + "Hello Mr Anderson", + 20, + 1, + ), + ( + "Hello Mr Anderson\nIt is inevitable", + 20, + 2, + ), + ( + "Hello Mr Anderson\nIt is inevitable\nHAHAHAHAH", + 20, + 3, + ), + ), +) +def test_get_num_lines(output: str, columns: int, expected: int) -> None: + assert get_num_lines(output, columns) == expected + + +@pytest.mark.parametrize("columns,lines", ((8, 3), (5, 4))) +def test_get_num_lines_with_columns(columns: int, lines: int) -> None: + assert get_num_lines("Hello Mr Anderson", columns=columns) == lines + + +def test_get_num_lines_with_long_command() -> None: + # First line is a 800 length string, which divides evenly into `200` + line = "long" * 200 + assert get_num_lines(f"{line}\nLong output", columns=200) == 5 + + +def test_get_num_lines_with_long_line() -> None: + assert get_num_lines(" " * 250, columns=200) == 2 + + +@pytest.mark.parametrize("chars", ["\x1B[0m", "\x1B(B"]) +def test_get_num_lines_ignores_ansi_chars(chars: str) -> None: + assert get_num_lines(chars * 100, columns=10) == 1 + + +def test_set_process_lines() -> None: + output = ProcessGroupOutput( + id=1, + processes=[ + ProcessOutput( + id=1, + process=Process(1, "echo first; echo second"), + data="first\nsecond\n", + ) + ], + ) + + set_process_lines(output, lines=58) + + assert output.processes[0].lines == 58 + + +def test_printer_generate_process_output() -> None: + printer = Printer(colours=Colours.from_colour("no")) + process = Process(1, "echo first; echo second") + process.run() + process.wait() + + output = printer.generate_process_output( + ProcessOutput( + id=1, + process=process, + data="first\nsecond\n", + ) + ) + + assert output == [ + (False, "[echo first; echo second] done ✔", "\n"), + (True, "first", "\n"), + (True, "second", "\n"), + ] + + +def test_printer_generate_process_output_status() -> None: + printer = Printer(colours=Colours.from_colour("no")) + process = Process(1, "echo first; echo second") + process.run() + process.wait() + + output = printer.generate_process_output_status( + ProcessOutput( + id=1, + process=process, + data="first\nsecond\n", + ) + ) + + assert output == "[echo first; echo second] done ✔" + + +def test_printer_generate_process_group_output() -> None: + printer = Printer(colours=Colours.from_colour("no")) + process1 = Process(1, "echo first; echo second") + process2 = Process(1, "echo third; echo fourth") + process1.run() + process2.run() + process1.wait() + process2.wait() + + output = printer.generate_process_group_output( + ProcessGroupOutput( + id=1, + processes=[ + ProcessOutput( + id=1, + process=process1, + data="first\nsecond\n", + ), + ProcessOutput( + id=2, + process=process2, + data="third\nfourth\n", + ), + ], + ), + ) + + assert output == [ + (False, "[echo first; echo second] done ✔", "\n"), + (True, "first", "\n"), + (True, "second", "\n"), + (False, "[echo third; echo fourth] done ✔", "\n"), + (True, "third", "\n"), + (True, "fourth", "\n"), + ] diff --git a/tests/test_process_group.py b/tests/test_process_group.py index 61af480..ef94b2d 100644 --- a/tests/test_process_group.py +++ b/tests/test_process_group.py @@ -1,65 +1,77 @@ from __future__ import annotations +import time -import pytest -from pyallel.process import Process -from pyallel.process_group import ProcessGroup, get_num_lines - - -def test_from_command() -> None: - expected_process = Process(id=1, command="sleep 0.1") - process = Process(1, "sleep 0.1") - assert process == expected_process +from pyallel.process import Process, ProcessOutput +from pyallel.process_group import ProcessGroupOutput, ProcessGroup def test_from_commands() -> None: expected_process_group = ProcessGroup( + id=1, processes=[ Process(id=1, command="sleep 0.1"), Process(id=2, command="sleep 0.2"), Process(id=3, command="sleep 0.3"), - ] + ], ) - process_group = ProcessGroup.from_commands("sleep 0.1", "sleep 0.2", "sleep 0.3") - assert process_group == expected_process_group - - -@pytest.mark.parametrize( - "output,expected", - ( - ( - "Hello Mr Anderson", - 1, - ), - ( - "Hello Mr Anderson\nIt is inevitable", - 2, - ), - ( - "Hello Mr Anderson\nIt is inevitable\nHAHAHAHAH", - 3, - ), - ), -) -def test_get_num_lines(output: str, expected: int) -> None: - assert get_num_lines(output) == expected - - -@pytest.mark.parametrize("columns,lines", ((8, 3), (5, 4))) -def test_get_num_lines_with_columns(columns: int, lines: int) -> None: - assert get_num_lines("Hello Mr Anderson", columns=columns) == lines + process_group = ProcessGroup.from_commands( + 1, 1, "sleep 0.1", "sleep 0.2", "sleep 0.3" + ) + assert process_group.id == expected_process_group.id + assert len(process_group.processes) == len(expected_process_group.processes) -def test_get_num_lines_with_long_command() -> None: - # First line is a 800 length string, which divides evenly into `200` - line = "long" * 200 - assert get_num_lines(f"{line}\nLong output", columns=200) == 5 +def test_stream() -> None: + process_group = ProcessGroup( + id=1, + processes=[ + Process(id=1, command="echo first; echo hi"), + Process(id=2, command="echo second"), + Process(id=3, command="echo third"), + ], + ) + process_group.run() + time.sleep(0.1) + output = process_group.stream() + assert len(output.processes) == 3 -def test_get_num_lines_with_long_line() -> None: - assert get_num_lines(" " * 250, columns=200) == 2 +def test_output_merge() -> None: + output = ProcessGroupOutput( + id=1, + processes=[ + ProcessOutput( + id=1, + process=Process(id=1, command="echo first; echo hi"), + data="first\nhi\n", + ), + ProcessOutput( + id=1, process=Process(id=2, command="echo second"), data="second\n" + ), + ProcessOutput( + id=3, process=Process(id=3, command="echo third"), data="third\n" + ), + ], + ) + output.merge( + ProcessGroupOutput( + id=1, + processes=[ + ProcessOutput( + id=1, + process=Process(id=1, command="echo first; echo hi"), + data="bye\n", + ), + ProcessOutput( + id=1, process=Process(id=2, command="echo second"), data="hi\n" + ), + ProcessOutput( + id=3, process=Process(id=3, command="echo third"), data="five\n" + ), + ], + ) + ) -@pytest.mark.parametrize("chars", ["\x1B[0m", "\x1B(B"]) -def test_get_num_lines_ignores_ansi_chars(chars: str) -> None: - assert get_num_lines(chars * 100, columns=10) == 1 + assert len(output.processes) == 3 diff --git a/tests/test_process_manager.py b/tests/test_process_manager.py index 1fa73c4..2e14cd7 100644 --- a/tests/test_process_manager.py +++ b/tests/test_process_manager.py @@ -1,4 +1,5 @@ from __future__ import annotations +import time import pytest @@ -7,19 +8,59 @@ from pyallel.process_group_manager import ProcessGroupManager +def test_stream() -> None: + pg_manager = ProcessGroupManager( + process_groups=[ + ProcessGroup( + id=1, + processes=[ + Process(id=1, command="echo first"), + Process(id=2, command="echo second"), + ], + ), + ProcessGroup( + id=2, + processes=[ + Process(id=3, command="echo third"), + Process(id=4, command="echo fourth"), + ], + ), + ], + ) + pg_manager.run() + pg_manager.get_cur_process_group_output() + time.sleep(0.1) + output = pg_manager.stream() + assert len(output.process_group_outputs) == 1 + assert output.process_group_outputs[1].id == 1 + assert len(output.process_group_outputs[1].processes) == 2 + assert pg_manager.poll() == 0 + pg_manager.run() + pg_manager.get_cur_process_group_output() + time.sleep(0.1) + output = pg_manager.stream() + assert len(output.process_group_outputs) == 1 + assert output.process_group_outputs[2].id == 2 + assert len(output.process_group_outputs[2].processes) == 2 + assert pg_manager.poll() == 0 + + def test_from_args() -> None: expected_process_group_manager = ProcessGroupManager( process_groups=[ ProcessGroup( + id=1, processes=[ Process(id=1, command="sleep 0.1"), Process(id=2, command="sleep 0.2"), - ] + ], ) ] ) process_group_manager = ProcessGroupManager.from_args("sleep 0.1", "sleep 0.2") - assert process_group_manager == expected_process_group_manager + assert len(process_group_manager._process_groups) == len( + expected_process_group_manager._process_groups + ) @pytest.mark.parametrize( @@ -30,22 +71,25 @@ def test_from_args() -> None: ProcessGroupManager( process_groups=[ ProcessGroup( + id=1, processes=[ Process(id=1, command="sleep 0.1"), ], ), ProcessGroup( + id=2, processes=[ - Process(id=1, command="sleep 0.2"), - Process(id=2, command="sleep 0.3"), - ] + Process(id=2, command="sleep 0.2"), + Process(id=3, command="sleep 0.3"), + ], ), ProcessGroup( + id=3, processes=[ - Process(id=1, command="sleep 0.4"), - ] + Process(id=4, command="sleep 0.4"), + ], ), - ] + ], ), ), ( @@ -62,24 +106,27 @@ def test_from_args() -> None: ProcessGroupManager( process_groups=[ ProcessGroup( + id=1, processes=[ Process(id=1, command="sleep 0.1"), Process(id=2, command="sleep 0.2"), ], ), ProcessGroup( + id=2, processes=[ - Process(id=1, command="sleep 0.3"), - Process(id=2, command="sleep 0.4"), + Process(id=3, command="sleep 0.3"), + Process(id=4, command="sleep 0.4"), ], ), ProcessGroup( + id=3, processes=[ - Process(id=1, command="sleep 0.5"), - Process(id=2, command="sleep 0.6"), + Process(id=5, command="sleep 0.5"), + Process(id=6, command="sleep 0.6"), ], ), - ] + ], ), ), ), @@ -88,4 +135,6 @@ def test_from_args_with_separators( args: list[str], expected_process_group_manager: ProcessGroupManager ) -> None: process_group_manager = ProcessGroupManager.from_args(*args) - assert process_group_manager == expected_process_group_manager + assert len(process_group_manager._process_groups) == len( + expected_process_group_manager._process_groups + )