From 612b1e293697a7b548f13421f0e09cae29708b54 Mon Sep 17 00:00:00 2001 From: Benjamin Nussbaum <50522055+Benjamin-Nussbaum@users.noreply.github.com> Date: Thu, 30 Oct 2025 17:06:02 -0400 Subject: [PATCH 1/9] Add a pull request template (#122) --- .../pull_request_template.md | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 .github/PULL_REQUEST_TEMPLATE/pull_request_template.md diff --git a/.github/PULL_REQUEST_TEMPLATE/pull_request_template.md b/.github/PULL_REQUEST_TEMPLATE/pull_request_template.md new file mode 100644 index 00000000..66692249 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE/pull_request_template.md @@ -0,0 +1,37 @@ + + +## Description + + + +## Motivation and Context + + + + +## How have these changes been tested? + + + + + +## Screenshots (if appropriate): + +## Types of changes + + + +- [ ] Bug fix (non-breaking change which fixes an issue) +- [ ] New feature (non-breaking change which adds functionality) +- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) + +## Checklist: + + + + +- [ ] My code follows the code style of this project. +- [ ] My changes require changes to the documentation. +- [ ] I have updated the documentation accordingly. +- [ ] I have added tests to cover my changes. +- [ ] All new and existing tests pass. From dde012322e17a2af9f78db93c3d8df15772f76ef Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Thu, 30 Oct 2025 16:19:04 -0500 Subject: [PATCH 2/9] Add Ell14K rotator driver --- src/pqnstack/pqn/drivers/rotator.py | 571 +++++++++++++++++++++++++++- 1 file changed, 570 insertions(+), 1 deletion(-) diff --git a/src/pqnstack/pqn/drivers/rotator.py b/src/pqnstack/pqn/drivers/rotator.py index 4f76a55f..8ef2808a 100644 --- a/src/pqnstack/pqn/drivers/rotator.py +++ b/src/pqnstack/pqn/drivers/rotator.py @@ -147,7 +147,6 @@ def info(self) -> RotatorInfo: name=self.name, desc=self.desc, hw_address=self.hw_address, - # hw_status=, degrees=self.degrees, offset_degrees=self.offset_degrees, ) @@ -161,3 +160,573 @@ def degrees(self, degrees: float) -> None: self._conn.write(f"SRA {degrees}".encode()) self._degrees = degrees _ = self._conn.readline().decode() + + +@dataclass(slots=True) +class ELL14KRotator(RotatorInstrument): + """ + Implement a Thorlabs Elliptec ELL14K rotator over the Elliptec ASCII protocol. + + Inputs: + name: Logical name. + desc: Description. + hw_address: Serial port path. + offset_degrees: Static mechanical zero offset in degrees. + addr_hex: One-hex-digit device address string in [0-9A-F]. If unknown, leave "0". + block_while_moving: If True, motion calls wait until motion stops. + timeout_s: I/O timeout in seconds. + handshake_retries: Maximum discovery passes across candidate addresses. + home_on_start: If True, homes after successful handshake. + home_dir_cw: If True, home clockwise; otherwise counterclockwise. + + Outputs: + None. Use .info or .degrees to read current state. + """ + + name: str + desc: str + hw_address: str + offset_degrees: float = 0.0 + addr_hex: str = "0" + block_while_moving: bool = True + timeout_s: float = 2.5 + handshake_retries: int = 3 + home_on_start: bool = False + home_dir_cw: bool = True + + _degrees: float = 0.0 + _conn: serial.Serial = field(init=False, repr=False) + _ppd: float = field(init=False, default=0.0, repr=False) + _travel_deg: int = field(init=False, default=360, repr=False) + _raw_in: str = field(init=False, default="", repr=False) + + _IN_MIN_PARTS: int = field(init=False, default=9, repr=False) + _IN_FIXED_MIN_LEN: int = field(init=False, default=30, repr=False) + _WAIT_TIMEOUT_S: float = field(init=False, default=30.0, repr=False) + _DRAIN_SLEEP_S: float = field(init=False, default=0.005, repr=False) + + def start(self) -> None: + """ + Open, identify, scale, and synchronize. + + Inputs: + None. + + Outputs: + None. Raises RuntimeError on identification failure. + """ + self._open_port() + parsed, addr = self._identify() + self.addr_hex = addr + self._init_scale(parsed) + self._maybe_home() + self._sync_angle() + + def close(self) -> None: + """ + Return to zero and close the port. + + Inputs: + None. + + Outputs: + None. Logs warnings on failure and continues. + """ + try: + self.degrees = 0.0 + except (OSError, RuntimeError) as exc: + logger.warning("ell14k.close.zero_failed err=%r", exc) + try: + self._conn.close() + except (OSError, RuntimeError) as exc: + logger.warning("ell14k.close.serial_failed err=%r", exc) + + @property + def info(self) -> RotatorInfo: + """ + Return a snapshot of metadata and current angle. + + Inputs: + None. + + Outputs: + RotatorInfo with name, description, port, degrees, and offset. + """ + return RotatorInfo( + name=self.name, + desc=self.desc, + hw_address=self.hw_address, + degrees=self.degrees, + offset_degrees=self.offset_degrees, + ) + + @property + def degrees(self) -> float: + """ + Get the cached current angle in degrees referenced to the configured offset. + + Inputs: + None. + + Outputs: + Float in [0, 360). + """ + return self._degrees + + @degrees.setter + def degrees(self, degrees: float) -> None: + """ + Move to an absolute mechanical angle in degrees. + + Inputs: + degrees: Target angle referenced to user offset. Wrapped into [0, 360). + + Outputs: + None. Blocks until motion completes when block_while_moving is True. Updates cached angle using device readback. + """ + target = (degrees + self.offset_degrees) % 360.0 + eu = self._deg_to_eu(target) + cmd = f"{self.addr_hex}ma{eu:08X}" + gs0 = self._get_status() + t0 = time.time() + self._send(cmd) + logger.info("ell14k.move_to tx=%s target_deg=%.9f eu=%08X status_before=%r", cmd, target, eu, gs0) + if self.block_while_moving: + self._wait_for_completion() + pos = self._get_position_eu() + gs1 = self._get_status() + if pos is None: + self._degrees = degrees % 360.0 + logger.warning( + "ell14k.move_to.readback_none kept_req_deg=%.9f elapsed=%.3fs status_after=%r", + degrees, + time.time() - t0, + gs1, + ) + return + rb = (self._eu_to_deg(pos) - self.offset_degrees) % 360.0 + self._degrees = rb + logger.info( + "ell14k.move_to.readback po_eu=%08X rb_deg=%.9f elapsed=%.3fs status_after=%r", + pos, + rb, + time.time() - t0, + gs1, + ) + + def move_to(self, angle: float) -> None: + """ + Set the absolute angle in degrees. + + Inputs: + angle: Target angle referenced to user offset. Wrapped into [0, 360). + + Outputs: + None. + """ + self.degrees = angle + + def move_by(self, angle: float) -> None: + """ + Move by a signed delta in degrees relative to the current position. + + Inputs: + angle: Signed delta in degrees. Positive is CW. Negative is CCW. + + Outputs: + None. Blocks until motion completes when block_while_moving is True. Updates cached angle using device readback. + """ + delta_eu = self._deg_to_eu(angle) + dlabel = delta_eu if angle >= 0 else (1 << 32) + (-delta_eu) + cmd = f"{self.addr_hex}mr{int(dlabel):08X}" + gs0 = self._get_status() + t0 = time.time() + self._send(cmd) + logger.info( + "ell14k.move_by tx=%s delta_deg=%.9f delta_eu=%08X status_before=%r", + cmd, + angle, + int(dlabel) & 0xFFFFFFFF, + gs0, + ) + if self.block_while_moving: + self._wait_for_completion() + pos = self._get_position_eu() + gs1 = self._get_status() + if pos is None: + self._degrees = (self._degrees + angle) % 360.0 + logger.warning( + "ell14k.move_by.readback_none fallback_deg=%.9f elapsed=%.3fs status_after=%r", + self._degrees, + time.time() - t0, + gs1, + ) + return + rb = (self._eu_to_deg(pos) - self.offset_degrees) % 360.0 + self._degrees = rb + logger.info( + "ell14k.move_by.readback po_eu=%08X rb_deg=%.9f elapsed=%.3fs status_after=%r", + pos, + rb, + time.time() - t0, + gs1, + ) + + def _open_port(self) -> None: + """ + Open and prime the serial link. + + Inputs: + None. + + Outputs: + None. + """ + logger.info("ell14k.start port=%s req_addr=%s timeout=%.2f", self.hw_address, self.addr_hex, self.timeout_s) + self._conn = serial.Serial( + self.hw_address, + baudrate=9600, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=self.timeout_s, + write_timeout=self.timeout_s, + rtscts=False, + dsrdtr=False, + xonxoff=False, + ) + self._conn.reset_input_buffer() + self._conn.reset_output_buffer() + self._send("") + self._drain_reads(0.05) + + def _identify(self) -> tuple[dict[str, str], str]: + """ + Identify the device and resolve the active address. + + Inputs: + None. + + Outputs: + Tuple of (parsed IN fields, confirmed address). + """ + addrs = [self.addr_hex.upper()] + [a for a in "0123456789ABCDEF" if a != self.addr_hex.upper()] + ok_addr: str | None = None + parsed: dict[str, str] | None = None + for attempt in range(1, self.handshake_retries + 1): + for a in addrs: + self._drain_reads(0.01) + t0 = time.time() + self._send(f"{a}in") + line = self._readline_ascii() + dt = time.time() - t0 + logger.info( + "ell14k.in attempt=%d addr=%s rx=%r latency=%.3fs in_wait=%d", + attempt, + a, + line, + dt, + int(self._conn.in_waiting), + ) + if not line or not line.startswith(f"{a}IN"): + continue + self._raw_in = line + try: + parsed = self._parse_in(line) + except ValueError: + logger.exception("ell14k.in.parse addr=%s line=%r", a, line) + continue + ok_addr = a + break + if ok_addr: + break + self._send("") + self._drain_reads(0.05) + if not ok_addr or not parsed: + logger.error("ell14k.start.identify_failed") + msg = "ELL14K identify timed out" + raise RuntimeError(msg) + return parsed, ok_addr + + def _init_scale(self, parsed: dict[str, str]) -> None: + """ + Initialize travel and pulses-per-degree scale. + + Inputs: + parsed: Result of _parse_in. + + Outputs: + None. + """ + self._travel_deg = int(parsed.get("travel_hex", "168"), 16) if parsed.get("travel_hex") else 360 + ppu_hex = parsed.get("pulses_per_unit_hex", "00000000") + pulses_val = int(ppu_hex, 16) if ppu_hex else 0 + if self._travel_deg > 0 and pulses_val > 0: + self._ppd = float(pulses_val) / float(self._travel_deg) + else: + self._ppd = 262144.0 / 360.0 + logger.info( + "ell14k.scale addr=%s travel_deg=%d pulses_hex=%s pulses_val=%d ppd=%.9f raw_in=%r", + self.addr_hex, + self._travel_deg, + ppu_hex, + pulses_val, + self._ppd, + self._raw_in, + ) + + def _maybe_home(self) -> None: + """ + Perform an optional home cycle. + + Inputs: + None. + + Outputs: + None. + """ + if not self.home_on_start: + return + dir_code = "0" if self.home_dir_cw else "1" + t0 = time.time() + self._send(f"{self.addr_hex}ho{dir_code}") + logger.info("ell14k.home cmd=%s", f"{self.addr_hex}ho{dir_code}") + self._wait_for_completion() + logger.info("ell14k.home.done elapsed=%.3fs", time.time() - t0) + + def _sync_angle(self) -> None: + """ + Read back position and update cached degrees. + + Inputs: + None. + + Outputs: + None. + """ + pos_eu = self._get_position_eu() + if pos_eu is not None: + self._degrees = (self._eu_to_deg(pos_eu) - self.offset_degrees) % 360.0 + else: + self._degrees = 0.0 + gs_after = self._get_status() + logger.info("ell14k.start.sync degrees=%.9f status_after=%r", self._degrees, gs_after) + + def _send(self, payload: str) -> None: + """ + Transmit a single Elliptec command with CR termination. + + Inputs: + payload: ASCII command excluding terminator. May be empty to send CR only. + + Outputs: + None. + """ + tx = (payload + "\r").encode("ascii") + n = self._conn.write(tx) + logger.debug("ell14k.tx bytes=%d data=%r", n, tx) + + def _readline_ascii(self) -> str | None: + """ + Read one CRLF-terminated line as ASCII and strip line endings. + + Inputs: + None. + + Outputs: + Decoded string or None on timeout. + """ + raw = self._conn.readline() + if not raw: + return None + s = raw.decode("ascii", errors="ignore").strip("\r\n") + logger.debug("ell14k.rx line=%r", s) + return str(s) + + def _cmd(self, cmd: str) -> list[str]: + """ + Send one command and collect the immediate reply burst. + + Inputs: + cmd: ASCII command without terminator. + + Outputs: + List of decoded reply lines received before timeout or input drain. + """ + self._send(cmd) + lines: list[str] = [] + t0 = time.time() + while time.time() - t0 < self.timeout_s: + line = self._readline_ascii() + if not line: + break + lines.append(line) + if self._conn.in_waiting == 0: + break + logger.debug("ell14k.cmd tx=%r rx_lines=%r", cmd, lines) + return lines + + def _drain_reads(self, duration_s: float) -> None: + """ + Drain any pending input bytes for a fixed duration. + + Inputs: + duration_s: Seconds to spend draining. + + Outputs: + None. + """ + t0 = time.time() + total = 0 + while time.time() - t0 < duration_s: + n = int(self._conn.in_waiting) + if n <= 0: + time.sleep(self._DRAIN_SLEEP_S) + continue + _ = self._conn.read(n) + total += n + if total: + logger.debug("ell14k.drain bytes=%d", total) + + def _parse_in(self, line: str) -> dict[str, str]: + """ + Decode an IN identification reply in CSV or fixed-field format. + + Inputs: + line: One IN reply line starting with 'IN'. + + Outputs: + Dict with keys: ell, sn, year, fw, hw, travel_hex, pulses_per_unit_hex. + """ + if "," in line: + parts = line.split(",") + if len(parts) < self._IN_MIN_PARTS: + msg = f"bad IN csv: {line!r}" + raise ValueError(msg) + return { + "ell": parts[2], + "sn": parts[3], + "year": parts[4], + "fw": parts[5], + "hw": parts[6], + "travel_hex": parts[7], + "pulses_per_unit_hex": parts[8], + } + data = line[3:] + if len(data) < self._IN_FIXED_MIN_LEN: + msg = f"bad IN fixed: {line!r}" + raise ValueError(msg) + return { + "ell": data[0:2], + "sn": data[2:10], + "year": data[10:14], + "fw": data[14:16], + "hw": data[16:18], + "travel_hex": data[18:22], + "pulses_per_unit_hex": data[22:30], + } + + def _get_position_eu(self) -> int | None: + """ + Query the device for the current encoder units position. + + Inputs: + None. + + Outputs: + Unsigned 32-bit integer parsed from PO field, or None on failure. + """ + reps = self._cmd(f"{self.addr_hex}gp") + if not reps: + logger.debug("ell14k.gp no_reply") + return None + rep = reps[0] + idx = rep.find("PO") + if idx < 0 or len(rep) < idx + 10: + logger.debug("ell14k.gp bad_line=%r", rep) + return None + try: + val = int(rep[idx + 2 : idx + 10], 16) + except ValueError as exc: + logger.debug("ell14k.gp parse_error line=%r err=%r", rep, exc) + return None + else: + logger.debug("ell14k.gp po=%08X line=%r", val, rep) + return val + + def _get_status(self) -> dict[str, str] | None: + """ + Fetch the raw GS status line. + + Inputs: + None. + + Outputs: + Dict with key 'raw' holding the first reply line, or None on timeout. + """ + reps = self._cmd(f"{self.addr_hex}gs") + if not reps: + return None + return {"raw": reps[0]} + + def _wait_for_completion(self) -> None: + """ + Poll until motion stops or the watchdog expires. + + Inputs: + None. + + Outputs: + None. Returns when two successive PO reads are equal or on timeout. + """ + t0 = time.time() + last_po: int | None = None + last_gs: str | None = None + while time.time() - t0 < self._WAIT_TIMEOUT_S: + reps = self._cmd(f"{self.addr_hex}gp") + if reps: + rep = reps[0] + if "PO" in rep: + try: + pos_start = rep.find("PO") + 2 + cur = int(rep[pos_start : pos_start + 8], 16) + except ValueError as exc: + logger.debug("ell14k.wait parse_error line=%r err=%r", rep, exc) + else: + if last_po is not None and cur == last_po: + logger.debug("ell14k.wait stable_po=%08X", cur) + return + last_po = cur + gs = self._get_status() + if gs and gs.get("raw") != last_gs: + last_gs = gs.get("raw") + logger.debug("ell14k.wait status=%r", last_gs) + time.sleep(0.05) + logger.warning("ell14k.wait timeout") + + def _deg_to_eu(self, deg: float) -> int: + """ + Encode degrees into encoder units using the discovered scale. + + Inputs: + deg: Angle in degrees. Wrapped into [0, 360). + + Outputs: + Unsigned 32-bit integer within one revolution. + """ + return round((deg % 360.0) * self._ppd) % int(self._ppd * 360.0) if self._ppd > 0 else 0 + + def _eu_to_deg(self, eu: int | None) -> float: + """ + Decode encoder units back to degrees using the discovered scale. + + Inputs: + eu: Unsigned 32-bit integer position or None. + + Outputs: + Angle in degrees in [0, 360). Returns 0.0 if scale unknown or input None. + """ + if eu is None or self._ppd <= 0: + return 0.0 + return ((eu % int(self._ppd * 360.0)) / float(self._ppd)) % 360.0 + From 77f2623e40caaef276d4658e66183692e6643940 Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Fri, 31 Oct 2025 12:31:09 -0500 Subject: [PATCH 3/9] Ruff format --- src/pqnstack/pqn/drivers/rotator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pqnstack/pqn/drivers/rotator.py b/src/pqnstack/pqn/drivers/rotator.py index 8ef2808a..1ac2a145 100644 --- a/src/pqnstack/pqn/drivers/rotator.py +++ b/src/pqnstack/pqn/drivers/rotator.py @@ -729,4 +729,3 @@ def _eu_to_deg(self, eu: int | None) -> float: if eu is None or self._ppd <= 0: return 0.0 return ((eu % int(self._ppd * 360.0)) / float(self._ppd)) % 360.0 - From 704444bc4fc7a37f7a6018549a9741c865e271ce Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Fri, 31 Oct 2025 14:31:48 -0500 Subject: [PATCH 4/9] Remove extra pr template --- .../pull_request_template.md | 37 ------------------- 1 file changed, 37 deletions(-) delete mode 100644 .github/PULL_REQUEST_TEMPLATE/pull_request_template.md diff --git a/.github/PULL_REQUEST_TEMPLATE/pull_request_template.md b/.github/PULL_REQUEST_TEMPLATE/pull_request_template.md deleted file mode 100644 index 66692249..00000000 --- a/.github/PULL_REQUEST_TEMPLATE/pull_request_template.md +++ /dev/null @@ -1,37 +0,0 @@ - - -## Description - - - -## Motivation and Context - - - - -## How have these changes been tested? - - - - - -## Screenshots (if appropriate): - -## Types of changes - - - -- [ ] Bug fix (non-breaking change which fixes an issue) -- [ ] New feature (non-breaking change which adds functionality) -- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - -## Checklist: - - - - -- [ ] My code follows the code style of this project. -- [ ] My changes require changes to the documentation. -- [ ] I have updated the documentation accordingly. -- [ ] I have added tests to cover my changes. -- [ ] All new and existing tests pass. From 0eae951603eabe9b0a16676e40097469ba54ef3e Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Fri, 31 Oct 2025 15:18:09 -0500 Subject: [PATCH 5/9] Remove redundancy --- src/pqnstack/pqn/drivers/rotator.py | 150 +--------------------------- 1 file changed, 5 insertions(+), 145 deletions(-) diff --git a/src/pqnstack/pqn/drivers/rotator.py b/src/pqnstack/pqn/drivers/rotator.py index 1ac2a145..43e55784 100644 --- a/src/pqnstack/pqn/drivers/rotator.py +++ b/src/pqnstack/pqn/drivers/rotator.py @@ -178,9 +178,6 @@ class ELL14KRotator(RotatorInstrument): handshake_retries: Maximum discovery passes across candidate addresses. home_on_start: If True, homes after successful handshake. home_dir_cw: If True, home clockwise; otherwise counterclockwise. - - Outputs: - None. Use .info or .degrees to read current state. """ name: str @@ -206,15 +203,7 @@ class ELL14KRotator(RotatorInstrument): _DRAIN_SLEEP_S: float = field(init=False, default=0.005, repr=False) def start(self) -> None: - """ - Open, identify, scale, and synchronize. - - Inputs: - None. - - Outputs: - None. Raises RuntimeError on identification failure. - """ + """Open, identify, scale, and synchronize.""" self._open_port() parsed, addr = self._identify() self.addr_hex = addr @@ -223,15 +212,7 @@ def start(self) -> None: self._sync_angle() def close(self) -> None: - """ - Return to zero and close the port. - - Inputs: - None. - - Outputs: - None. Logs warnings on failure and continues. - """ + """Return to zero and close the port.""" try: self.degrees = 0.0 except (OSError, RuntimeError) as exc: @@ -246,9 +227,6 @@ def info(self) -> RotatorInfo: """ Return a snapshot of metadata and current angle. - Inputs: - None. - Outputs: RotatorInfo with name, description, port, degrees, and offset. """ @@ -265,9 +243,6 @@ def degrees(self) -> float: """ Get the cached current angle in degrees referenced to the configured offset. - Inputs: - None. - Outputs: Float in [0, 360). """ @@ -280,9 +255,6 @@ def degrees(self, degrees: float) -> None: Inputs: degrees: Target angle referenced to user offset. Wrapped into [0, 360). - - Outputs: - None. Blocks until motion completes when block_while_moving is True. Updates cached angle using device readback. """ target = (degrees + self.offset_degrees) % 360.0 eu = self._deg_to_eu(target) @@ -314,74 +286,8 @@ def degrees(self, degrees: float) -> None: gs1, ) - def move_to(self, angle: float) -> None: - """ - Set the absolute angle in degrees. - - Inputs: - angle: Target angle referenced to user offset. Wrapped into [0, 360). - - Outputs: - None. - """ - self.degrees = angle - - def move_by(self, angle: float) -> None: - """ - Move by a signed delta in degrees relative to the current position. - - Inputs: - angle: Signed delta in degrees. Positive is CW. Negative is CCW. - - Outputs: - None. Blocks until motion completes when block_while_moving is True. Updates cached angle using device readback. - """ - delta_eu = self._deg_to_eu(angle) - dlabel = delta_eu if angle >= 0 else (1 << 32) + (-delta_eu) - cmd = f"{self.addr_hex}mr{int(dlabel):08X}" - gs0 = self._get_status() - t0 = time.time() - self._send(cmd) - logger.info( - "ell14k.move_by tx=%s delta_deg=%.9f delta_eu=%08X status_before=%r", - cmd, - angle, - int(dlabel) & 0xFFFFFFFF, - gs0, - ) - if self.block_while_moving: - self._wait_for_completion() - pos = self._get_position_eu() - gs1 = self._get_status() - if pos is None: - self._degrees = (self._degrees + angle) % 360.0 - logger.warning( - "ell14k.move_by.readback_none fallback_deg=%.9f elapsed=%.3fs status_after=%r", - self._degrees, - time.time() - t0, - gs1, - ) - return - rb = (self._eu_to_deg(pos) - self.offset_degrees) % 360.0 - self._degrees = rb - logger.info( - "ell14k.move_by.readback po_eu=%08X rb_deg=%.9f elapsed=%.3fs status_after=%r", - pos, - rb, - time.time() - t0, - gs1, - ) - def _open_port(self) -> None: - """ - Open and prime the serial link. - - Inputs: - None. - - Outputs: - None. - """ + """Open and prime the serial link.""" logger.info("ell14k.start port=%s req_addr=%s timeout=%.2f", self.hw_address, self.addr_hex, self.timeout_s) self._conn = serial.Serial( self.hw_address, @@ -404,9 +310,6 @@ def _identify(self) -> tuple[dict[str, str], str]: """ Identify the device and resolve the active address. - Inputs: - None. - Outputs: Tuple of (parsed IN fields, confirmed address). """ @@ -454,9 +357,6 @@ def _init_scale(self, parsed: dict[str, str]) -> None: Inputs: parsed: Result of _parse_in. - - Outputs: - None. """ self._travel_deg = int(parsed.get("travel_hex", "168"), 16) if parsed.get("travel_hex") else 360 ppu_hex = parsed.get("pulses_per_unit_hex", "00000000") @@ -476,15 +376,6 @@ def _init_scale(self, parsed: dict[str, str]) -> None: ) def _maybe_home(self) -> None: - """ - Perform an optional home cycle. - - Inputs: - None. - - Outputs: - None. - """ if not self.home_on_start: return dir_code = "0" if self.home_dir_cw else "1" @@ -495,15 +386,7 @@ def _maybe_home(self) -> None: logger.info("ell14k.home.done elapsed=%.3fs", time.time() - t0) def _sync_angle(self) -> None: - """ - Read back position and update cached degrees. - - Inputs: - None. - - Outputs: - None. - """ + """Read back position and update cached degrees.""" pos_eu = self._get_position_eu() if pos_eu is not None: self._degrees = (self._eu_to_deg(pos_eu) - self.offset_degrees) % 360.0 @@ -518,9 +401,6 @@ def _send(self, payload: str) -> None: Inputs: payload: ASCII command excluding terminator. May be empty to send CR only. - - Outputs: - None. """ tx = (payload + "\r").encode("ascii") n = self._conn.write(tx) @@ -530,9 +410,6 @@ def _readline_ascii(self) -> str | None: """ Read one CRLF-terminated line as ASCII and strip line endings. - Inputs: - None. - Outputs: Decoded string or None on timeout. """ @@ -572,9 +449,6 @@ def _drain_reads(self, duration_s: float) -> None: Inputs: duration_s: Seconds to spend draining. - - Outputs: - None. """ t0 = time.time() total = 0 @@ -630,9 +504,6 @@ def _get_position_eu(self) -> int | None: """ Query the device for the current encoder units position. - Inputs: - None. - Outputs: Unsigned 32-bit integer parsed from PO field, or None on failure. """ @@ -658,9 +529,6 @@ def _get_status(self) -> dict[str, str] | None: """ Fetch the raw GS status line. - Inputs: - None. - Outputs: Dict with key 'raw' holding the first reply line, or None on timeout. """ @@ -670,15 +538,7 @@ def _get_status(self) -> dict[str, str] | None: return {"raw": reps[0]} def _wait_for_completion(self) -> None: - """ - Poll until motion stops or the watchdog expires. - - Inputs: - None. - - Outputs: - None. Returns when two successive PO reads are equal or on timeout. - """ + """Poll until motion stops or the watchdog expires.""" t0 = time.time() last_po: int | None = None last_gs: str | None = None From bdf9d534ba1b048325df9150e6315a811ecc2251 Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Fri, 31 Oct 2025 15:54:06 -0500 Subject: [PATCH 6/9] Updates based on Benjamin's suggestions --- src/pqnstack/pqn/drivers/rotator.py | 88 +++++++++-------------------- 1 file changed, 28 insertions(+), 60 deletions(-) diff --git a/src/pqnstack/pqn/drivers/rotator.py b/src/pqnstack/pqn/drivers/rotator.py index 43e55784..7c136916 100644 --- a/src/pqnstack/pqn/drivers/rotator.py +++ b/src/pqnstack/pqn/drivers/rotator.py @@ -193,17 +193,17 @@ class ELL14KRotator(RotatorInstrument): _degrees: float = 0.0 _conn: serial.Serial = field(init=False, repr=False) - _ppd: float = field(init=False, default=0.0, repr=False) + _pulses_per_degree: float = field(init=False, default=0.0, repr=False) + _ENCODER_UNITS_PER_DEGREE: float = field(init=False, default=0.0, repr=False) _travel_deg: int = field(init=False, default=360, repr=False) - _raw_in: str = field(init=False, default="", repr=False) + _raw_ident_reply: str = field(init=False, default="", repr=False) - _IN_MIN_PARTS: int = field(init=False, default=9, repr=False) + _IN_MIN_CSV_PARTS: int = field(init=False, default=9, repr=False) _IN_FIXED_MIN_LEN: int = field(init=False, default=30, repr=False) _WAIT_TIMEOUT_S: float = field(init=False, default=30.0, repr=False) _DRAIN_SLEEP_S: float = field(init=False, default=0.005, repr=False) def start(self) -> None: - """Open, identify, scale, and synchronize.""" self._open_port() parsed, addr = self._identify() self.addr_hex = addr @@ -212,7 +212,6 @@ def start(self) -> None: self._sync_angle() def close(self) -> None: - """Return to zero and close the port.""" try: self.degrees = 0.0 except (OSError, RuntimeError) as exc: @@ -224,12 +223,6 @@ def close(self) -> None: @property def info(self) -> RotatorInfo: - """ - Return a snapshot of metadata and current angle. - - Outputs: - RotatorInfo with name, description, port, degrees, and offset. - """ return RotatorInfo( name=self.name, desc=self.desc, @@ -240,24 +233,16 @@ def info(self) -> RotatorInfo: @property def degrees(self) -> float: - """ - Get the cached current angle in degrees referenced to the configured offset. - - Outputs: - Float in [0, 360). - """ return self._degrees @degrees.setter def degrees(self, degrees: float) -> None: - """ - Move to an absolute mechanical angle in degrees. - - Inputs: - degrees: Target angle referenced to user offset. Wrapped into [0, 360). - """ target = (degrees + self.offset_degrees) % 360.0 - eu = self._deg_to_eu(target) + eu = ( + round((target % 360.0) * self._ENCODER_UNITS_PER_DEGREE) % int(self._ENCODER_UNITS_PER_DEGREE * 360.0) + if self._ENCODER_UNITS_PER_DEGREE > 0 + else 0 + ) cmd = f"{self.addr_hex}ma{eu:08X}" gs0 = self._get_status() t0 = time.time() @@ -276,7 +261,10 @@ def degrees(self, degrees: float) -> None: gs1, ) return - rb = (self._eu_to_deg(pos) - self.offset_degrees) % 360.0 + rb = ( + (((pos % int(self._ENCODER_UNITS_PER_DEGREE * 360.0)) / float(self._ENCODER_UNITS_PER_DEGREE)) % 360.0) + - self.offset_degrees + ) % 360.0 self._degrees = rb logger.info( "ell14k.move_to.readback po_eu=%08X rb_deg=%.9f elapsed=%.3fs status_after=%r", @@ -287,7 +275,6 @@ def degrees(self, degrees: float) -> None: ) def _open_port(self) -> None: - """Open and prime the serial link.""" logger.info("ell14k.start port=%s req_addr=%s timeout=%.2f", self.hw_address, self.addr_hex, self.timeout_s) self._conn = serial.Serial( self.hw_address, @@ -333,7 +320,7 @@ def _identify(self) -> tuple[dict[str, str], str]: ) if not line or not line.startswith(f"{a}IN"): continue - self._raw_in = line + self._raw_ident_reply = line try: parsed = self._parse_in(line) except ValueError: @@ -362,17 +349,18 @@ def _init_scale(self, parsed: dict[str, str]) -> None: ppu_hex = parsed.get("pulses_per_unit_hex", "00000000") pulses_val = int(ppu_hex, 16) if ppu_hex else 0 if self._travel_deg > 0 and pulses_val > 0: - self._ppd = float(pulses_val) / float(self._travel_deg) + self._pulses_per_degree = float(pulses_val) / float(self._travel_deg) else: - self._ppd = 262144.0 / 360.0 + self._pulses_per_degree = 262144.0 / 360.0 + self._ENCODER_UNITS_PER_DEGREE = self._pulses_per_degree logger.info( - "ell14k.scale addr=%s travel_deg=%d pulses_hex=%s pulses_val=%d ppd=%.9f raw_in=%r", + "ell14k.scale addr=%s travel_deg=%d pulses_hex=%s pulses_val=%d pulses_per_degree=%.9f raw_ident_response=%r", self.addr_hex, self._travel_deg, ppu_hex, pulses_val, - self._ppd, - self._raw_in, + self._pulses_per_degree, + self._raw_ident_reply, ) def _maybe_home(self) -> None: @@ -389,7 +377,13 @@ def _sync_angle(self) -> None: """Read back position and update cached degrees.""" pos_eu = self._get_position_eu() if pos_eu is not None: - self._degrees = (self._eu_to_deg(pos_eu) - self.offset_degrees) % 360.0 + self._degrees = ( + ( + ((pos_eu % int(self._ENCODER_UNITS_PER_DEGREE * 360.0)) / float(self._ENCODER_UNITS_PER_DEGREE)) + % 360.0 + ) + - self.offset_degrees + ) % 360.0 else: self._degrees = 0.0 gs_after = self._get_status() @@ -474,7 +468,7 @@ def _parse_in(self, line: str) -> dict[str, str]: """ if "," in line: parts = line.split(",") - if len(parts) < self._IN_MIN_PARTS: + if len(parts) < self._IN_MIN_CSV_PARTS: msg = f"bad IN csv: {line!r}" raise ValueError(msg) return { @@ -563,29 +557,3 @@ def _wait_for_completion(self) -> None: logger.debug("ell14k.wait status=%r", last_gs) time.sleep(0.05) logger.warning("ell14k.wait timeout") - - def _deg_to_eu(self, deg: float) -> int: - """ - Encode degrees into encoder units using the discovered scale. - - Inputs: - deg: Angle in degrees. Wrapped into [0, 360). - - Outputs: - Unsigned 32-bit integer within one revolution. - """ - return round((deg % 360.0) * self._ppd) % int(self._ppd * 360.0) if self._ppd > 0 else 0 - - def _eu_to_deg(self, eu: int | None) -> float: - """ - Decode encoder units back to degrees using the discovered scale. - - Inputs: - eu: Unsigned 32-bit integer position or None. - - Outputs: - Angle in degrees in [0, 360). Returns 0.0 if scale unknown or input None. - """ - if eu is None or self._ppd <= 0: - return 0.0 - return ((eu % int(self._ppd * 360.0)) / float(self._ppd)) % 360.0 From 732a738971c7548e127df9ccfaf04ba2adabbce3 Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Fri, 31 Oct 2025 16:00:25 -0500 Subject: [PATCH 7/9] Remove pulses per degree in favor of encoder units per degree --- src/pqnstack/pqn/drivers/rotator.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/pqnstack/pqn/drivers/rotator.py b/src/pqnstack/pqn/drivers/rotator.py index 7c136916..707efd88 100644 --- a/src/pqnstack/pqn/drivers/rotator.py +++ b/src/pqnstack/pqn/drivers/rotator.py @@ -193,7 +193,6 @@ class ELL14KRotator(RotatorInstrument): _degrees: float = 0.0 _conn: serial.Serial = field(init=False, repr=False) - _pulses_per_degree: float = field(init=False, default=0.0, repr=False) _ENCODER_UNITS_PER_DEGREE: float = field(init=False, default=0.0, repr=False) _travel_deg: int = field(init=False, default=360, repr=False) _raw_ident_reply: str = field(init=False, default="", repr=False) @@ -349,17 +348,16 @@ def _init_scale(self, parsed: dict[str, str]) -> None: ppu_hex = parsed.get("pulses_per_unit_hex", "00000000") pulses_val = int(ppu_hex, 16) if ppu_hex else 0 if self._travel_deg > 0 and pulses_val > 0: - self._pulses_per_degree = float(pulses_val) / float(self._travel_deg) + self._ENCODER_UNITS_PER_DEGREE = float(pulses_val) / float(self._travel_deg) else: - self._pulses_per_degree = 262144.0 / 360.0 - self._ENCODER_UNITS_PER_DEGREE = self._pulses_per_degree + self._ENCODER_UNITS_PER_DEGREE = 262144.0 / 360.0 logger.info( - "ell14k.scale addr=%s travel_deg=%d pulses_hex=%s pulses_val=%d pulses_per_degree=%.9f raw_ident_response=%r", + "ell14k.scale addr=%s travel_deg=%d pulses_hex=%s pulses_val=%d ENCODER_UNITS_PER_DEGREE=%.9f raw_ident_response=%r", self.addr_hex, self._travel_deg, ppu_hex, pulses_val, - self._pulses_per_degree, + self._ENCODER_UNITS_PER_DEGREE, self._raw_ident_reply, ) From c70269c8e56c13ec20a5111e3d089e2a98ae49fd Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Thu, 6 Nov 2025 21:24:13 -0600 Subject: [PATCH 8/9] Add velocity, Reduce more other stuff --- src/pqnstack/pqn/drivers/rotator.py | 193 +++++++++++----------------- 1 file changed, 75 insertions(+), 118 deletions(-) diff --git a/src/pqnstack/pqn/drivers/rotator.py b/src/pqnstack/pqn/drivers/rotator.py index 707efd88..9f6c0f92 100644 --- a/src/pqnstack/pqn/drivers/rotator.py +++ b/src/pqnstack/pqn/drivers/rotator.py @@ -164,51 +164,28 @@ def degrees(self, degrees: float) -> None: @dataclass(slots=True) class ELL14KRotator(RotatorInstrument): - """ - Implement a Thorlabs Elliptec ELL14K rotator over the Elliptec ASCII protocol. - - Inputs: - name: Logical name. - desc: Description. - hw_address: Serial port path. - offset_degrees: Static mechanical zero offset in degrees. - addr_hex: One-hex-digit device address string in [0-9A-F]. If unknown, leave "0". - block_while_moving: If True, motion calls wait until motion stops. - timeout_s: I/O timeout in seconds. - handshake_retries: Maximum discovery passes across candidate addresses. - home_on_start: If True, homes after successful handshake. - home_dir_cw: If True, home clockwise; otherwise counterclockwise. - """ - - name: str - desc: str - hw_address: str - offset_degrees: float = 0.0 addr_hex: str = "0" block_while_moving: bool = True timeout_s: float = 2.5 handshake_retries: int = 3 home_on_start: bool = False - home_dir_cw: bool = True + _ENCODER_UNITS_PER_DEGREE: float = 262144.0 / 360.0 _degrees: float = 0.0 _conn: serial.Serial = field(init=False, repr=False) - _ENCODER_UNITS_PER_DEGREE: float = field(init=False, default=0.0, repr=False) _travel_deg: int = field(init=False, default=360, repr=False) - _raw_ident_reply: str = field(init=False, default="", repr=False) - _IN_MIN_CSV_PARTS: int = field(init=False, default=9, repr=False) - _IN_FIXED_MIN_LEN: int = field(init=False, default=30, repr=False) - _WAIT_TIMEOUT_S: float = field(init=False, default=30.0, repr=False) + _WAIT_TIMEOUT_S: float = field(init=False, default=10.0, repr=False) _DRAIN_SLEEP_S: float = field(init=False, default=0.005, repr=False) def start(self) -> None: + self.parameters.add("velocity_percent") self._open_port() parsed, addr = self._identify() self.addr_hex = addr self._init_scale(parsed) - self._maybe_home() self._sync_angle() + self._maybe_home() def close(self) -> None: try: @@ -237,11 +214,12 @@ def degrees(self) -> float: @degrees.setter def degrees(self, degrees: float) -> None: target = (degrees + self.offset_degrees) % 360.0 - eu = ( - round((target % 360.0) * self._ENCODER_UNITS_PER_DEGREE) % int(self._ENCODER_UNITS_PER_DEGREE * 360.0) - if self._ENCODER_UNITS_PER_DEGREE > 0 - else 0 - ) + if self._ENCODER_UNITS_PER_DEGREE > 0: + eu_full_rev = int(self._ENCODER_UNITS_PER_DEGREE * 360.0) + eu = round((target % 360.0) * self._ENCODER_UNITS_PER_DEGREE) % eu_full_rev + else: + eu = 0 + cmd = f"{self.addr_hex}ma{eu:08X}" gs0 = self._get_status() t0 = time.time() @@ -260,10 +238,8 @@ def degrees(self, degrees: float) -> None: gs1, ) return - rb = ( - (((pos % int(self._ENCODER_UNITS_PER_DEGREE * 360.0)) / float(self._ENCODER_UNITS_PER_DEGREE)) % 360.0) - - self.offset_degrees - ) % 360.0 + + rb = self._angle_from_eu(pos) self._degrees = rb logger.info( "ell14k.move_to.readback po_eu=%08X rb_deg=%.9f elapsed=%.3fs status_after=%r", @@ -273,6 +249,55 @@ def degrees(self, degrees: float) -> None: gs1, ) + @property + def velocity_percent(self) -> int: + reps = self._cmd(f"{self.addr_hex}gv") + if not reps: + msg = "ELL14K velocity query timed out (no reply to gv)" + raise RuntimeError(msg) + rep = reps[0] + idx = rep.find("GV") + if idx < 0 or len(rep) < idx + 4: + msg = f"Bad GV reply: {rep!r}" + raise RuntimeError(msg) + hex_val = rep[idx + 2 : idx + 4] + try: + val = int(hex_val, 16) + except ValueError as exc: + msg = f"Bad GV value {hex_val!r} in reply {rep!r}" + raise RuntimeError(msg) from exc + if not (0 <= val <= 100): # noqa: PLR2004 + logger.warning("ell14k.velocity_percent.readback_out_of_range val=%d rep=%r", val, rep) + return val + + @velocity_percent.setter + def velocity_percent(self, percent: int) -> None: + if not isinstance(percent, int): + msg = "velocity_percent must be an int in [0, 100]" + raise TypeError(msg) + if not (0 <= percent <= 100): # noqa: PLR2004 + msg = "velocity_percent must be in [0, 100]" + raise ValueError(msg) + cmd = f"{self.addr_hex}sv{percent:02X}" + t0 = time.time() + self._send(cmd) + logger.info("ell14k.set_velocity tx=%s percent=%d", cmd, percent) + try: + rb = self.velocity_percent + except RuntimeError as exc: + logger.warning("ell14k.set_velocity.readback_failed err=%r", exc) + return + logger.info( + "ell14k.set_velocity.readback percent_rb=%d elapsed=%.3fs", + rb, + time.time() - t0, + ) + + def _angle_from_eu(self, eu: int) -> float: + eu_full_rev = int(self._ENCODER_UNITS_PER_DEGREE * 360.0) + deg = ((eu % eu_full_rev) / float(self._ENCODER_UNITS_PER_DEGREE)) % 360.0 + return (deg - self.offset_degrees) % 360.0 + def _open_port(self) -> None: logger.info("ell14k.start port=%s req_addr=%s timeout=%.2f", self.hw_address, self.addr_hex, self.timeout_s) self._conn = serial.Serial( @@ -293,12 +318,6 @@ def _open_port(self) -> None: self._drain_reads(0.05) def _identify(self) -> tuple[dict[str, str], str]: - """ - Identify the device and resolve the active address. - - Outputs: - Tuple of (parsed IN fields, confirmed address). - """ addrs = [self.addr_hex.upper()] + [a for a in "0123456789ABCDEF" if a != self.addr_hex.upper()] ok_addr: str | None = None parsed: dict[str, str] | None = None @@ -319,7 +338,6 @@ def _identify(self) -> tuple[dict[str, str], str]: ) if not line or not line.startswith(f"{a}IN"): continue - self._raw_ident_reply = line try: parsed = self._parse_in(line) except ValueError: @@ -338,13 +356,8 @@ def _identify(self) -> tuple[dict[str, str], str]: return parsed, ok_addr def _init_scale(self, parsed: dict[str, str]) -> None: - """ - Initialize travel and pulses-per-degree scale. - - Inputs: - parsed: Result of _parse_in. - """ - self._travel_deg = int(parsed.get("travel_hex", "168"), 16) if parsed.get("travel_hex") else 360 + travel_hex = parsed.get("travel_hex") + self._travel_deg = int(travel_hex, 16) if travel_hex else 360 ppu_hex = parsed.get("pulses_per_unit_hex", "00000000") pulses_val = int(ppu_hex, 16) if ppu_hex else 0 if self._travel_deg > 0 and pulses_val > 0: @@ -352,59 +365,46 @@ def _init_scale(self, parsed: dict[str, str]) -> None: else: self._ENCODER_UNITS_PER_DEGREE = 262144.0 / 360.0 logger.info( - "ell14k.scale addr=%s travel_deg=%d pulses_hex=%s pulses_val=%d ENCODER_UNITS_PER_DEGREE=%.9f raw_ident_response=%r", + "ell14k.scale addr=%s travel_deg=%d pulses_hex=%s pulses_val=%d ENCODER_UNITS_PER_DEGREE=%.9f", self.addr_hex, self._travel_deg, ppu_hex, pulses_val, self._ENCODER_UNITS_PER_DEGREE, - self._raw_ident_reply, ) def _maybe_home(self) -> None: if not self.home_on_start: return - dir_code = "0" if self.home_dir_cw else "1" + + cur_deg = self.degrees % 360.0 + cw_dist = (360.0 - cur_deg) % 360.0 + ccw_dist = cur_deg + dir_code = "0" if cw_dist <= ccw_dist else "1" + + cmd = f"{self.addr_hex}ho{dir_code}" t0 = time.time() - self._send(f"{self.addr_hex}ho{dir_code}") - logger.info("ell14k.home cmd=%s", f"{self.addr_hex}ho{dir_code}") + self._send(cmd) + logger.info("ell14k.home cmd=%s cur_deg=%.9f cw_dist=%.9f ccw_dist=%.9f", cmd, cur_deg, cw_dist, ccw_dist) self._wait_for_completion() logger.info("ell14k.home.done elapsed=%.3fs", time.time() - t0) + self._sync_angle() def _sync_angle(self) -> None: - """Read back position and update cached degrees.""" pos_eu = self._get_position_eu() if pos_eu is not None: - self._degrees = ( - ( - ((pos_eu % int(self._ENCODER_UNITS_PER_DEGREE * 360.0)) / float(self._ENCODER_UNITS_PER_DEGREE)) - % 360.0 - ) - - self.offset_degrees - ) % 360.0 + self._degrees = self._angle_from_eu(pos_eu) else: self._degrees = 0.0 gs_after = self._get_status() logger.info("ell14k.start.sync degrees=%.9f status_after=%r", self._degrees, gs_after) def _send(self, payload: str) -> None: - """ - Transmit a single Elliptec command with CR termination. - - Inputs: - payload: ASCII command excluding terminator. May be empty to send CR only. - """ tx = (payload + "\r").encode("ascii") n = self._conn.write(tx) logger.debug("ell14k.tx bytes=%d data=%r", n, tx) def _readline_ascii(self) -> str | None: - """ - Read one CRLF-terminated line as ASCII and strip line endings. - - Outputs: - Decoded string or None on timeout. - """ raw = self._conn.readline() if not raw: return None @@ -413,15 +413,6 @@ def _readline_ascii(self) -> str | None: return str(s) def _cmd(self, cmd: str) -> list[str]: - """ - Send one command and collect the immediate reply burst. - - Inputs: - cmd: ASCII command without terminator. - - Outputs: - List of decoded reply lines received before timeout or input drain. - """ self._send(cmd) lines: list[str] = [] t0 = time.time() @@ -436,12 +427,6 @@ def _cmd(self, cmd: str) -> list[str]: return lines def _drain_reads(self, duration_s: float) -> None: - """ - Drain any pending input bytes for a fixed duration. - - Inputs: - duration_s: Seconds to spend draining. - """ t0 = time.time() total = 0 while time.time() - t0 < duration_s: @@ -455,20 +440,8 @@ def _drain_reads(self, duration_s: float) -> None: logger.debug("ell14k.drain bytes=%d", total) def _parse_in(self, line: str) -> dict[str, str]: - """ - Decode an IN identification reply in CSV or fixed-field format. - - Inputs: - line: One IN reply line starting with 'IN'. - - Outputs: - Dict with keys: ell, sn, year, fw, hw, travel_hex, pulses_per_unit_hex. - """ if "," in line: parts = line.split(",") - if len(parts) < self._IN_MIN_CSV_PARTS: - msg = f"bad IN csv: {line!r}" - raise ValueError(msg) return { "ell": parts[2], "sn": parts[3], @@ -479,9 +452,6 @@ def _parse_in(self, line: str) -> dict[str, str]: "pulses_per_unit_hex": parts[8], } data = line[3:] - if len(data) < self._IN_FIXED_MIN_LEN: - msg = f"bad IN fixed: {line!r}" - raise ValueError(msg) return { "ell": data[0:2], "sn": data[2:10], @@ -493,12 +463,6 @@ def _parse_in(self, line: str) -> dict[str, str]: } def _get_position_eu(self) -> int | None: - """ - Query the device for the current encoder units position. - - Outputs: - Unsigned 32-bit integer parsed from PO field, or None on failure. - """ reps = self._cmd(f"{self.addr_hex}gp") if not reps: logger.debug("ell14k.gp no_reply") @@ -518,19 +482,12 @@ def _get_position_eu(self) -> int | None: return val def _get_status(self) -> dict[str, str] | None: - """ - Fetch the raw GS status line. - - Outputs: - Dict with key 'raw' holding the first reply line, or None on timeout. - """ reps = self._cmd(f"{self.addr_hex}gs") if not reps: return None return {"raw": reps[0]} def _wait_for_completion(self) -> None: - """Poll until motion stops or the watchdog expires.""" t0 = time.time() last_po: int | None = None last_gs: str | None = None From 04221687231cf52da8c74194baa2e5e0fea66e10 Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Thu, 6 Nov 2025 21:25:35 -0600 Subject: [PATCH 9/9] ruff format --- src/pqnstack/pqn/drivers/rotator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pqnstack/pqn/drivers/rotator.py b/src/pqnstack/pqn/drivers/rotator.py index 9f6c0f92..e33f16a6 100644 --- a/src/pqnstack/pqn/drivers/rotator.py +++ b/src/pqnstack/pqn/drivers/rotator.py @@ -266,7 +266,7 @@ def velocity_percent(self) -> int: except ValueError as exc: msg = f"Bad GV value {hex_val!r} in reply {rep!r}" raise RuntimeError(msg) from exc - if not (0 <= val <= 100): # noqa: PLR2004 + if not (0 <= val <= 100): # noqa: PLR2004 logger.warning("ell14k.velocity_percent.readback_out_of_range val=%d rep=%r", val, rep) return val @@ -275,7 +275,7 @@ def velocity_percent(self, percent: int) -> None: if not isinstance(percent, int): msg = "velocity_percent must be an int in [0, 100]" raise TypeError(msg) - if not (0 <= percent <= 100): # noqa: PLR2004 + if not (0 <= percent <= 100): # noqa: PLR2004 msg = "velocity_percent must be in [0, 100]" raise ValueError(msg) cmd = f"{self.addr_hex}sv{percent:02X}"