From 2d12b2dc0639938cbabe580eceeea93caa4ef428 Mon Sep 17 00:00:00 2001 From: Chris Kuethe Date: Fri, 16 May 2025 13:28:53 -0700 Subject: [PATCH] Adding a bunch more type annotations Pyrefly looks like a pretty neat way to check and enforce type annotations, and I also enabled strict checking with Pylance in VSFR ... err, VSCode. Pyrefly type checking could even be enabled as a github workflow if it's not too annoying. https://pyrefly.org/en/docs/installation/#add-pyrefly-to-ci --- radiacode/bytes_buffer.py | 9 +-- radiacode/decoders/databuf.py | 2 +- radiacode/decoders/spectrum.py | 13 ++-- radiacode/radiacode.py | 110 ++++++++++++++++----------------- radiacode/transports/usb.py | 17 ++--- 5 files changed, 77 insertions(+), 74 deletions(-) diff --git a/radiacode/bytes_buffer.py b/radiacode/bytes_buffer.py index 3110c81..f7ffcb4 100644 --- a/radiacode/bytes_buffer.py +++ b/radiacode/bytes_buffer.py @@ -1,4 +1,5 @@ import struct +from typing import Any class BytesBuffer: @@ -13,7 +14,7 @@ class BytesBuffer: data (bytes): The binary data to read from. """ - def __init__(self, data: bytes): + def __init__(self, data: bytes) -> None: """Initialize the BytesBuffer with binary data. Args: @@ -38,7 +39,7 @@ def data(self) -> bytes: """ return self._data[self._pos :] - def unpack(self, fmt: str) -> tuple: + def unpack(self, fmt: str) -> tuple[Any, ...]: """Unpack binary data according to the given format string. Uses the struct module's format syntax to unpack binary data from the @@ -54,7 +55,7 @@ def unpack(self, fmt: str) -> tuple: Raises: Exception: If there isn't enough data remaining in the buffer for the requested format. """ - sz = struct.calcsize(fmt) + sz: int = struct.calcsize(fmt) if self._pos + sz > len(self._data): raise ValueError(f'BytesBuffer: {sz} bytes required for {fmt}, but have only {len(self._data) - self._pos}') self._pos += sz @@ -73,5 +74,5 @@ def unpack_string(self) -> str: Exception: If there isn't enough data in the buffer. UnicodeDecodeError: If the string data cannot be decoded as ASCII. """ - slen = self.unpack('= 7: seq, eid, gid, ts_offset = br.unpack(' list[int]: - ret = [] + ret: list[int] = [] while br.size() > 0: ret.append(br.unpack(' list[int]: - ret = [] + ret: list[int] = [] last = 0 while br.size() > 0: - u16 = br.unpack('> 4) & 0x0FFF - vlen = u16 & 0x0F + u16: int = br.unpack('> 4) & 0x0FFF + vlen: int = u16 & 0x0F + v: int = 0 for _ in range(cnt): if vlen == 0: v = 0 @@ -44,7 +45,7 @@ def decode_RC_VS_SPECTRUM(br: BytesBuffer, format_version: int) -> Spectrum: ts, a0, a1, a2 = br.unpack(' datetime.datetime: return self._base_time def execute(self, reqtype: COMMAND, args: Optional[bytes] = None) -> BytesBuffer: - req_seq_no = 0x80 + self._seq + req_seq_no: int = 0x80 + self._seq self._seq = (self._seq + 1) % 32 - req_header = struct.pack(' BytesBuffer return response def read_request(self, command_id: int | VS | VSFR) -> BytesBuffer: - r = self.execute(COMMAND.RD_VIRT_STRING, struct.pack(' BytesBuffer: return r def write_request(self, command_id: int | VSFR, data: Optional[bytes] = None) -> None: - r = self.execute(COMMAND.WR_VIRT_SFR, struct.pack(' list[int Repeat count is not supported (use "ffff" instead of "4f") as the length of the unpack_format string must equal the number of VSFRs being fetched. """ - nvsfr = len(vsfr_ids) + nvsfr: int = len(vsfr_ids) if nvsfr == 0: raise ValueError('No VSFRs specified') @@ -164,23 +164,23 @@ def batch_read_vsfrs(self, vsfr_ids: list[VSFR], unpack_format: str) -> list[int if not (isinstance(unpack_format, str) and len(unpack_format) == nvsfr): raise ValueError(f'invalid unpack_format `{unpack_format}`') - msg = [struct.pack(' str: - r = self.execute(COMMAND.GET_STATUS) - flags = r.unpack(' None: The time components used are: year, month, day, hour, minute, second. Microseconds are ignored. """ - d = struct.pack(' str: - r = self.execute(COMMAND.FW_SIGNATURE) - signature = r.unpack(' tuple[tuple[int, int, str], tuple[int, int, str]]: @@ -210,11 +210,11 @@ def fw_version(self) -> tuple[tuple[int, int, str], tuple[int, int, str]]: - Boot version: (major, minor, date string) - Target version: (major, minor, date string) """ - r = self.execute(COMMAND.GET_VERSION) + r: BytesBuffer = self.execute(COMMAND.GET_VERSION) boot_minor, boot_major = r.unpack(' str: str: Hardware serial number formatted as hyphen-separated hexadecimal groups (e.g. "12345678-9ABCDEF0") """ - r = self.execute(COMMAND.GET_SERIAL) - serial_len = r.unpack(' str: - r = self.read_request(VS.CONFIGURATION) + r: BytesBuffer = self.read_request(VS.CONFIGURATION) return r.data().decode('cp1251') def text_message(self) -> str: - r = self.read_request(VS.TEXT_MESSAGE) + r: BytesBuffer = self.read_request(VS.TEXT_MESSAGE) return r.data().decode('ascii') def serial_number(self) -> str: @@ -246,11 +246,11 @@ def serial_number(self) -> str: Returns: str: The device serial number as an ASCII string """ - r = self.read_request(VS.SERIAL_NUMBER) + r: BytesBuffer = self.read_request(VS.SERIAL_NUMBER) return r.data().decode('ascii') def commands(self) -> str: - br = self.read_request(VS.SFR_FILE) + br: BytesBuffer = self.read_request(VS.SFR_FILE) return br.data().decode('ascii') # called with 0 after init! @@ -264,7 +264,7 @@ def device_time(self, v: int) -> None: def data_buf(self) -> list[DoseRateDB | RareData | RealTimeData | RawData | Event]: """Get buffered measurement data from the device.""" - r = self.read_request(VS.DATA_BUF) + r: BytesBuffer = self.read_request(VS.DATA_BUF) return decode_VS_DATA_BUF(r, self._base_time) def spectrum(self) -> Spectrum: @@ -273,7 +273,7 @@ def spectrum(self) -> Spectrum: Returns: Spectrum: Object containing the current spectrum data """ - r = self.read_request(VS.SPECTRUM) + r: BytesBuffer = self.read_request(VS.SPECTRUM) return decode_RC_VS_SPECTRUM(r, self._spectrum_format_version) def spectrum_accum(self) -> Spectrum: @@ -282,7 +282,7 @@ def spectrum_accum(self) -> Spectrum: Returns: Spectrum: Object containing the accumulated spectrum data """ - r = self.read_request(VS.SPEC_ACCUM) + r: BytesBuffer = self.read_request(VS.SPEC_ACCUM) return decode_RC_VS_SPECTRUM(r, self._spectrum_format_version) def dose_reset(self) -> None: @@ -298,8 +298,8 @@ def spectrum_reset(self) -> None: This clears the current spectrum data buffer, effectively resetting the spectrum measurement to start fresh. """ - r = self.execute(COMMAND.WR_VIRT_STRING, struct.pack(' list[float]: - a1: Linear term coefficient (keV/channel) - a2: Quadratic term coefficient (keV/channel^2) """ - r = self.read_request(VS.ENERGY_CALIB) + r: BytesBuffer = self.read_request(VS.ENERGY_CALIB) return list(r.unpack(' None: @@ -325,12 +325,12 @@ def set_energy_calib(self, coef: list[float]) -> None: - a2: Quadratic term coefficient (keV/channel^2) """ assert len(coef) == 3 - pc = struct.pack(' None: + def set_language(self, lang: str = 'ru') -> None: """Set the device interface language. Args: @@ -370,7 +370,7 @@ def set_sound_ctrl(self, ctrls: list[CTRL]) -> None: Args: ctrls: List of CTRL enum values specifying which events should trigger sounds """ - flags = 0 + flags: int = 0 for c in ctrls: flags |= int(c) self.write_request(VSFR.SOUND_CTRL, struct.pack(' None: Must be one of: 5, 10, 15, or 30 seconds. """ assert seconds in {5, 10, 15, 30} - v = 3 if seconds == 30 else (seconds // 5) - 1 + v: int = 3 if seconds == 30 else (seconds // 5) - 1 self.write_request(VSFR.DISP_OFF_TIME, struct.pack(' None: @@ -419,7 +419,7 @@ def set_vibro_ctrl(self, ctrls: list[CTRL]) -> None: def get_alarm_limits(self) -> AlarmLimits: "Retrieve the alarm limits" - regs = [ + regs: list[VSFR] = [ VSFR.CR_LEV1_cp10s, VSFR.CR_LEV2_cp10s, VSFR.DR_LEV1_uR_h, @@ -430,10 +430,10 @@ def get_alarm_limits(self) -> AlarmLimits: VSFR.CR_UNITS, ] - resp = self.batch_read_vsfrs(regs, 'I' * len(regs)) + resp: list[int] = self.batch_read_vsfrs(regs, 'I' * len(regs)) - dose_multiplier = 100 if resp[6] else 1 - count_multiplier = 60 if resp[7] else 1 + dose_multiplier: int = 100 if resp[6] else 1 + count_multiplier: int = 60 if resp[7] else 1 return AlarmLimits( l1_count_rate=resp[0] / 10 * count_multiplier, l2_count_rate=resp[1] / 10 * count_multiplier, @@ -483,12 +483,12 @@ def set_alarm_limits( unit will be set to Sv. """ - which_limits = [] - limit_values = [] + which_limits: list[VSFR] = [] + limit_values: list[int] = [] - dose_multiplier = 100 if dose_unit_sv is True else 1 + dose_multiplier: int = 100 if dose_unit_sv is True else 1 if isinstance(count_unit_cpm, bool): - count_multiplier = 1 / 6 if count_unit_cpm else 10 + count_multiplier: float = 1 / 6 if count_unit_cpm else 10 else: count_multiplier = 1 @@ -536,12 +536,12 @@ def set_alarm_limits( which_limits.append(VSFR.CR_UNITS) limit_values.append(int(count_unit_cpm)) - num_to_set = len(which_limits) + num_to_set: int = len(which_limits) if not num_to_set: raise ValueError('No limits specified') - pack_items = [num_to_set] + [int(x) for x in which_limits] + limit_values - pack_format = f' None: self.message = 'Multiple USB Read Failures' if message is None else message super().__init__(self.message) class Usb: - def __init__(self, serial_number=None, timeout_ms=3000): - _vid = 0x0483 - _pid = 0xF123 + def __init__(self, serial_number: str = '', timeout_ms: int = 3000) -> None: + _vid: int = 0x0483 + _pid: int = 0xF123 if serial_number: self._device = usb.core.find(idVendor=_vid, idProduct=_pid, serial_number=serial_number) @@ -39,8 +40,8 @@ def __init__(self, serial_number=None, timeout_ms=3000): def execute(self, request: bytes) -> BytesBuffer: self._device.write(0x1, request) - trials = 0 - max_trials = 3 + trials: int = 0 + max_trials: int = 3 while trials < max_trials: # repeat until non-zero lenght data received data = self._device.read(0x81, 256, timeout=self._timeout_ms).tobytes() if len(data) != 0: @@ -48,9 +49,9 @@ def execute(self, request: bytes) -> BytesBuffer: else: trials += 1 if trials >= max_trials: - raise MultipleUSBReadFailure(str(trials) + ' USB Read Failures in sequence') + raise MultipleUSBReadFailure(f'{trials} USB Read Failures in sequence') - response_length = struct.unpack_from('