From 3a863aff874f1637e35ee5da1c35e33dc3a3faa2 Mon Sep 17 00:00:00 2001 From: Jason Haines Date: Sun, 22 Dec 2024 16:59:00 +0930 Subject: [PATCH 1/6] Refactor - CLI arg parsing. --discover option Optional JSON output --- cli/broadlink_cli | 418 ++++++++++++++++++++++++++++------------------ 1 file changed, 260 insertions(+), 158 deletions(-) diff --git a/cli/broadlink_cli b/cli/broadlink_cli index 7913e332..62093008 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -1,6 +1,13 @@ #!/usr/bin/env python3 +""" +A command line interface to the broadlink library for executing common actions to control and interact with Broadlink devices +""" + import argparse import base64 +import os +import json +import sys import time from typing import List @@ -9,97 +16,176 @@ from broadlink.const import DEFAULT_PORT from broadlink.exceptions import ReadError, StorageError from broadlink.remote import data_to_pulses, pulses_to_data -TIMEOUT = 30 +DEFAULT_TIMEOUT = 30 # seconds +DEFAULT_DEVICE = 0x2712 -def auto_int(x): +def auto_int(x) -> int: + """Parse the given value to an integer""" return int(x, 0) +def auto_hex(x) -> bytearray: + """Parse the given hex string to a byte array""" + return bytearray.fromhex(x) + def format_pulses(pulses: List[int]) -> str: - """Format pulses.""" + """Concatentate the list of pulses""" return " ".join( f"+{pulse}" if i % 2 == 0 else f"-{pulse}" for i, pulse in enumerate(pulses) ) - def parse_pulses(data: List[str]) -> List[int]: """Parse pulses.""" return [abs(int(s)) for s in data] +def parse_args() -> object: + """parse and process the commandline arguments""" -parser = argparse.ArgumentParser(fromfile_prefix_chars='@') -parser.add_argument("--device", help="device definition as 'type host mac'") -parser.add_argument("--type", type=auto_int, default=0x2712, help="type of device") -parser.add_argument("--host", help="host address") -parser.add_argument("--mac", help="mac address (hex reverse), as used by python-broadlink library") -parser.add_argument("--temperature", action="store_true", help="request temperature from device") -parser.add_argument("--humidity", action="store_true", help="request humidity from device") -parser.add_argument("--energy", action="store_true", help="request energy consumption from device") -parser.add_argument("--check", action="store_true", help="check current power state") -parser.add_argument("--checknl", action="store_true", help="check current nightlight state") -parser.add_argument("--turnon", action="store_true", help="turn on device") -parser.add_argument("--turnoff", action="store_true", help="turn off device") -parser.add_argument("--turnnlon", action="store_true", help="turn on nightlight on the device") -parser.add_argument("--turnnloff", action="store_true", help="turn off nightlight on the device") -parser.add_argument("--switch", action="store_true", help="switch state from on to off and off to on") -parser.add_argument("--send", action="store_true", help="send command") -parser.add_argument("--sensors", action="store_true", help="check all sensors") -parser.add_argument("--learn", action="store_true", help="learn command") -parser.add_argument("--rflearn", action="store_true", help="rf scan learning") -parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning") -parser.add_argument("--learnfile", help="save learned command to a specified file") -parser.add_argument("--durations", action="store_true", - help="use durations in micro seconds instead of the Broadlink format") -parser.add_argument("--convert", action="store_true", help="convert input data to durations") -parser.add_argument("--joinwifi", nargs=2, help="Args are SSID PASSPHRASE to configure Broadlink device with") -parser.add_argument("data", nargs='*', help="Data to send or convert") -args = parser.parse_args() - -if args.device: - values = args.device.split() - devtype = int(values[0], 0) - host = values[1] - mac = bytearray.fromhex(values[2]) -elif args.mac: - devtype = args.type - host = args.host - mac = bytearray.fromhex(args.mac) - -if args.host or args.device: - dev = broadlink.gendevice(devtype, (host, DEFAULT_PORT), mac) - dev.auth() - -if args.joinwifi: - broadlink.setup(args.joinwifi[0], args.joinwifi[1], 4) - -if args.convert: - data = bytearray.fromhex(''.join(args.data)) - pulses = data_to_pulses(data) - print(format_pulses(pulses)) -if args.temperature: - print(dev.check_temperature()) -if args.humidity: - print(dev.check_humidity()) -if args.energy: - print(dev.get_energy()) -if args.sensors: - data = dev.check_sensors() - for key in data: - print("{} {}".format(key, data[key])) -if args.send: - data = ( - pulses_to_data(parse_pulses(args.data)) - if args.durations - else bytes.fromhex(''.join(args.data)) + parser = argparse.ArgumentParser( + fromfile_prefix_chars="@", + description="Control and interact with Broadlink devices.", + formatter_class=argparse.RawTextHelpFormatter, ) - dev.send_data(data) -if args.learn or (args.learnfile and not args.rflearn): + + action_group = parser.add_argument_group("Actions", "Specify the action to perform.") + action_group.add_argument("--sensors", dest="action", action="store_const", + const="sensors", help="Check all sensors") + action_group.add_argument("--temperature", dest="action", action="store_const", + const="temperature", help="Request temperature from the device") + action_group.add_argument("--humidity", dest="action", action="store_const", + const="humidity", help="Request humidity from the device") + + action_group.add_argument("--energy", dest="action", action="store_const", + const="energy", help="Request energy consumption from the device") + action_group.add_argument("--check", dest="action", action="store_const", + const="check", help="Check current power state") + action_group.add_argument("--checknl", dest="action", action="store_const", + const="checknl", help="Check current nightlight state") + + action_group.add_argument("--turnon", dest="action", action="store_const", + const="turnon", help="Turn on the device") + action_group.add_argument("--turnoff", dest="action", action="store_const", + const="turnoff", help="Turn off the device") + action_group.add_argument("--turnnlon", dest="action", action="store_const", + const="turnnlon", help="Turn on nightlight on the device") + action_group.add_argument("--turnnloff", dest="action", action="store_const", + const="turnnloff", help="Turn off nightlight on the device") + action_group.add_argument("--switch", dest="action", action="store_const", + const="switch", help="Switch state from on to off and off to on") + + action_group.add_argument("--send", dest="action", action="store_const", + const="send", help="Send command") + + action_group.add_argument("--discover", dest="action", action="store_const", + const="discover", help="Discover Broadlink devices on the local network") + action_group.add_argument("--learn", dest="action", action="store_const", + const="irlearn", help="IR learning") + action_group.add_argument("--rflearn", dest="action", action="store_const", + const="rflearn", help="RF scan learning") + + action_group.add_argument("--convert", dest="action", action="store_const", + const="convert", help="Convert input data to durations") + + parser.add_argument("--device", help="Device definition as 'type host mac'") + parser.add_argument("--type", type=auto_int, default=DEFAULT_DEVICE, dest="devtype", + help="Type of device") + parser.add_argument("--host", help="Host address") + parser.add_argument("--mac", type=auto_hex, + help="MAC address (hex reverse), as used by python-broadlink library") + parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Broadlink port") + parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, + help="The timeout period for actions") + + parser.add_argument("--joinwifi", nargs=2, + help="SSID and PASSPHRASE to configure Broadlink device with") + + parser.add_argument("--frequency", type=float, help="Specify radiofrequency for learning") + parser.add_argument("--learnfile", help="Save learned command to a specified file") + + parser.add_argument("--durations", action="store_true", + help="Use durations in microseconds instead of Broadlink format") + parser.add_argument("--json", action="store_true", default=False, + dest="json_out", help="Output in JSON format") + parser.add_argument("data", nargs="*", help="Data to send or convert") + + args = parser.parse_args() + + if args.durations: + args.data = pulses_to_data(parse_pulses(args.data)) + elif args.data: + args.data = bytes.fromhex("".join(args.data)) + + if args.device: + values = args.device.split() + args.devtype = int(values[0], 0) + args.host = values[1] + args.mac = bytearray.fromhex(values[2]) + + if not args.action: + parser.print_help() + print("You must specify an action", file=sys.stderr) + sys.exit(1) + # Validation: Ensure device data is provided for actions that need it + elif args.action not in ["convert", "discover"]: + if not (args.device or (args.devtype and args.host and args.mac)): + parser.error( + f"For --{args.action} you must also specify --device " + "or provide --devtype, --host, and --mac." + ) + + return args + +def format_packet(data: object) -> dict: + return { + "raw": data.hex(), + "base64": base64.b64encode(data).decode("ascii"), + "pulses": format_pulses(data_to_pulses(data)), + } + +def write_learnfile(learnfile: str, durations: bool, packet: dict): + if learnfile: + print(f"Saving to {learnfile}", file=sys.stderr) + with open(learnfile, "w", encoding="utf-8") as text_file: + text_file.write(packet.pulse_fmt if durations else packet.raw_fmt) + + +def print_data(json_out: bool, data: any, sep: str=' '): + if json_out: + print(json.dumps(data, indent=4)) + return + + if isinstance(data, dict): + data = [data] + + for key in data[0].keys(): + print(key, sep, end='') + print() + for item in data: + for key in data[0].keys(): + print(item[key], sep, end='') + print() + +def discover_devices(json_out: bool, ip=None) -> int: + try: + print(f"Scanning { (ip + ' on ') if ip else '' }local network...", file=sys.stderr) + devices = broadlink.discover(local_ip_address=ip) + devices[0] + for device in devices: + print_data(json_out, { "devtype": device.devtype, "host": device.host[0], "mac": device.mac.hex() }) + return 0 + except Exception as e: + print(f"Error in discover_devices: {e}", file=sys.stderr) + return 1 + +def do_learn(json_out: bool, dev: object, learnfile: str, timeout: int, durations: bool) -> int: + """Infrared learning""" + dev.enter_learning() - print("Learning...") + print("Awaiting infrared code...", file=sys.stderr) start = time.time() - while time.time() - start < TIMEOUT: + while time.time() - start < timeout: time.sleep(1) try: data = dev.check_data() @@ -108,93 +194,49 @@ if args.learn or (args.learnfile and not args.rflearn): else: break else: - print("No data received...") - exit(1) - - print("Packet found!") - raw_fmt = data.hex() - base64_fmt = base64.b64encode(data).decode('ascii') - pulse_fmt = format_pulses(data_to_pulses(data)) - - print("Raw:", raw_fmt) - print("Base64:", base64_fmt) - print("Pulses:", pulse_fmt) - - if args.learnfile: - print("Saving to {}".format(args.learnfile)) - with open(args.learnfile, "w") as text_file: - text_file.write(pulse_fmt if args.durations else raw_fmt) -if args.check: - if dev.check_power(): - print('* ON *') - else: - print('* OFF *') -if args.checknl: - if dev.check_nightlight(): - print('* ON *') - else: - print('* OFF *') -if args.turnon: - dev.set_power(True) - if dev.check_power(): - print('== Turned * ON * ==') - else: - print('!! Still OFF !!') -if args.turnoff: - dev.set_power(False) - if dev.check_power(): - print('!! Still ON !!') - else: - print('== Turned * OFF * ==') -if args.turnnlon: - dev.set_nightlight(True) - if dev.check_nightlight(): - print('== Turned * ON * ==') - else: - print('!! Still OFF !!') -if args.turnnloff: - dev.set_nightlight(False) - if dev.check_nightlight(): - print('!! Still ON !!') - else: - print('== Turned * OFF * ==') -if args.switch: - if dev.check_power(): - dev.set_power(False) - print('* Switch to OFF *') - else: - dev.set_power(True) - print('* Switch to ON *') -if args.rflearn: - if args.frequency: - frequency = args.frequency - print("Press the button you want to learn, a short press...") + print("No data received...", file=sys.stderr) + return 1 + + print("Packet found!", file=sys.stderr) + packet = format_packet(data) + print_data(json_out, packet, sep=os.linesep) + write_learnfile(learnfile, durations, packet) + + return 0 + +def do_rflearn(json_out: bool, dev: object, frequency: float, learnfile: str, timeout: int, durations: bool) -> int: + """ + Radiofrequency learning + """ + + if frequency: + print("Press the button you want to learn, a SHORT press...", file=sys.stderr) else: dev.sweep_frequency() - print("Detecting radiofrequency, press and hold the button to learn...") + print("Detecting radiofrequency, press and HOLD the button to learn...", file=sys.stderr) start = time.time() - while time.time() - start < TIMEOUT: + while time.time() - start < timeout: time.sleep(1) locked, frequency = dev.check_frequency() if locked: break else: - print("Radiofrequency not found") + print("Radiofrequency not found", file=sys.stderr) dev.cancel_sweep_frequency() - exit(1) + return 1 - print("Radiofrequency detected: {}MHz".format(frequency)) - print("You can now let go of the button") + print(f"Radiofrequency detected: {frequency}MHz", file=sys.stderr) + print("You can now LET GO of the button", file=sys.stderr) + time.sleep(0.5) input("Press enter to continue...") - - print("Press the button again, now a short press.") + print("Press the button again, now a SHORT press.", file=sys.stderr) dev.find_rf_packet(frequency) start = time.time() - while time.time() - start < TIMEOUT: + while time.time() - start < timeout: time.sleep(1) try: data = dev.check_data() @@ -203,19 +245,79 @@ if args.rflearn: else: break else: - print("No data received...") - exit(1) - - print("Packet found!") - raw_fmt = data.hex() - base64_fmt = base64.b64encode(data).decode('ascii') - pulse_fmt = format_pulses(data_to_pulses(data)) - - print("Raw:", raw_fmt) - print("Base64:", base64_fmt) - print("Pulses:", pulse_fmt) - - if args.learnfile: - print("Saving to {}".format(args.learnfile)) - with open(args.learnfile, "w") as text_file: - text_file.write(pulse_fmt if args.durations else raw_fmt) + print("No data received...", file=sys.stderr) + return 1 + + print("Packet found!", file=sys.stderr) + packet = format_packet(data) + print_data(json_out, packet, sep=os.linesep) + write_learnfile(learnfile, durations, packet) + + +def main() -> int: + """Main function - execute the specified action""" + args = parse_args() + + if args.host and args.mac and args.devtype: + dev = broadlink.gendevice(args.devtype, (args.host, args.port), args.mac) + dev.auth() + + try: + match args.action: + case "temperature": + print_data(args.json_out, { "temperature": dev.check_temperature() }) + case "humidity": + print_data(args.json_out, { "humidity": dev.check_humidity() }) + case "energy": + print_data(args.json_out, { "energy": dev.get_energy()}) + case "sensors": + print_data(args.json_out, dev.check_sensors() ) + case "check": + print_data(args.json_out, { "power": dev.check_power() }) + case "checknl": + print_data(args.json_out, { "nightlight": dev.check_nightlight() }) + + case "turnon": + dev.set_power(True) + print_data(args.json_out, { "power": dev.check_power() }) + case "turnoff": + dev.set_power(False) + print_data(args.json_out, { "power": dev.check_power() }) + case "switch": + dev.set_power(not dev.check_power()) + print_data(args.json_out, { "power": dev.check_nightlight() }) + + case "turnnlon": + dev.set_nightlight(True) + print_data(args.json_out, { "nightlight": dev.check_nightlight() }) + case "turnnloff": + dev.set_nightlight(False) + print_data(args.json_out, { "nightlight": dev.check_nightlight() }) + + case "send": + dev.send_data(args.data) + + case "discover": + return discover_devices(args.json_out, args.host) + case "irlearn": + return do_learn(args.json_out, dev, args.learnfile, args.timeout, args.durations) + case "rflearn": + return do_rflearn(args.json_out, dev, args.frequency, args.learnfile, args.timeout, args.durations) + + case "convert": + print(data_to_pulses(bytearray.fromhex("".join(args.data)))) + + case _: + print(f"Unknown action: {args.action}") + return 255 + except AttributeError as e: + if hasattr(e.obj, 'TYPE'): + print(f"Action {e.name} not supported by device {dev.TYPE} {dev.devtype}") + return 2 + else: + raise + + return 0 + + +sys.exit(main()) From f9c8138412b7d0fa8c220d91a4af6486ad8dbf5f Mon Sep 17 00:00:00 2001 From: Jason Haines Date: Sat, 18 Jan 2025 12:33:41 +0930 Subject: [PATCH 2/6] Add broadlink_cli to deployment package Allow JSON (default) output on all commands Add --learnfile template Fix majority of linter warnings --- cli/broadlink_cli | 307 +++++++++++++++++++++++++++------------------- setup.py | 17 ++- 2 files changed, 195 insertions(+), 129 deletions(-) diff --git a/cli/broadlink_cli b/cli/broadlink_cli index 62093008..40e10a21 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -1,6 +1,7 @@ #!/usr/bin/env python3 """ -A command line interface to the broadlink library for executing common actions to control and interact with Broadlink devices +A command line interface to the broadlink library for executing common actions to +control and interact with Broadlink devices """ import argparse @@ -29,17 +30,6 @@ def auto_hex(x) -> bytearray: return bytearray.fromhex(x) -def format_pulses(pulses: List[int]) -> str: - """Concatentate the list of pulses""" - return " ".join( - f"+{pulse}" if i % 2 == 0 else f"-{pulse}" - for i, pulse in enumerate(pulses) - ) - -def parse_pulses(data: List[str]) -> List[int]: - """Parse pulses.""" - return [abs(int(s)) for s in data] - def parse_args() -> object: """parse and process the commandline arguments""" @@ -56,7 +46,7 @@ def parse_args() -> object: const="temperature", help="Request temperature from the device") action_group.add_argument("--humidity", dest="action", action="store_const", const="humidity", help="Request humidity from the device") - + action_group.add_argument("--energy", dest="action", action="store_const", const="energy", help="Request energy consumption from the device") action_group.add_argument("--check", dest="action", action="store_const", @@ -78,15 +68,18 @@ def parse_args() -> object: action_group.add_argument("--send", dest="action", action="store_const", const="send", help="Send command") - action_group.add_argument("--discover", dest="action", action="store_const", - const="discover", help="Discover Broadlink devices on the local network") - action_group.add_argument("--learn", dest="action", action="store_const", + action_group.add_argument("--convert", dest="action", action="store_const", + const="convert", help="Convert input data to durations") + + action_group.add_argument("--discover", dest="action", action="store_const", const="discover", + help="Scan the local network to discover Broadlink devices") + action_group.add_argument("--irlearn", "--learn", dest="action", action="store_const", const="irlearn", help="IR learning") action_group.add_argument("--rflearn", dest="action", action="store_const", const="rflearn", help="RF scan learning") - - action_group.add_argument("--convert", dest="action", action="store_const", - const="convert", help="Convert input data to durations") + + parser.add_argument("--frequency", type=float, + help="Specify radiofrequency for --rflean, otherwise scan") parser.add_argument("--device", help="Device definition as 'type host mac'") parser.add_argument("--type", type=auto_int, default=DEFAULT_DEVICE, dest="devtype", @@ -94,6 +87,7 @@ def parse_args() -> object: parser.add_argument("--host", help="Host address") parser.add_argument("--mac", type=auto_hex, help="MAC address (hex reverse), as used by python-broadlink library") + parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Broadlink port") parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="The timeout period for actions") @@ -101,13 +95,20 @@ def parse_args() -> object: parser.add_argument("--joinwifi", nargs=2, help="SSID and PASSPHRASE to configure Broadlink device with") - parser.add_argument("--frequency", type=float, help="Specify radiofrequency for learning") + parser.add_argument("--learntemplate", + help='Specified template for learning and outputing multiple controls ' + '- e.g. --learntemplate \'{ "on" : "raw", "off": "raw" }\'') parser.add_argument("--learnfile", help="Save learned command to a specified file") + parser.add_argument("--output", dest="out_fmt", + choices=['json', 'text', 'raw', 'base64', 'pulses'], + default='json', + help="Specify the output format. 'json' (default), 'text', or a single learned field:" + "'raw' (hex; default Broadlink format), 'base64', 'pulses' (microsecond-format) " + ) + parser.add_argument("--durations", action="store_true", - help="Use durations in microseconds instead of Broadlink format") - parser.add_argument("--json", action="store_true", default=False, - dest="json_out", help="Output in JSON format") + help="Parse [data] durations in microsecond- instead of Broadlink-format") parser.add_argument("data", nargs="*", help="Data to send or convert") args = parser.parse_args() @@ -125,133 +126,185 @@ def parse_args() -> object: if not args.action: parser.print_help() - print("You must specify an action", file=sys.stderr) + print("Error: You must specify an action", file=sys.stderr) sys.exit(1) - # Validation: Ensure device data is provided for actions that need it elif args.action not in ["convert", "discover"]: if not (args.device or (args.devtype and args.host and args.mac)): parser.error( f"For --{args.action} you must also specify --device " "or provide --devtype, --host, and --mac." ) + elif args.action not in ["irlearn", "rflearn"]: + if args.learntemplate: + parser.error( + "--learntemplate can only be used with --learn or --rflearn" + ) + if args.out_fmt not in [ "json", "text" ]: + parser.error( + f"--output {args.out_fmt} can only be used with --learn or --rflearn" + ) + + if args.learntemplate: + if args.out_fmt != "json": + parser.error("--learntemplate can only be used with --output json") + try: + args.learntemplate = json.loads(args.learntemplate) + except json.JSONDecodeError as e: + parser.error(f"--learntemplate is not valid JSON - {e.args[0]}") + + if not isinstance(args.learntemplate, dict): + parser.error("invalid --learntemplate structure - must be a dict") return args +def format_pulses(pulses: List[int]) -> str: + """Concatentate the list of pulses""" + return " ".join( + f"+{pulse}" if i % 2 == 0 else f"-{pulse}" + for i, pulse in enumerate(pulses) + ) + +def parse_pulses(data: List[str]) -> List[int]: + """Parse pulses.""" + return [abs(int(s)) for s in data] + +def get_frequency(dev: object, timeout: int): + """Scan to detect RF remote frequency""" + dev.sweep_frequency() + print("Detecting radiofrequency, press and HOLD the button to learn...", file=sys.stderr) + + start = time.time() + while time.time() - start < timeout: + time.sleep(1) + locked, frequency = dev.check_frequency() + if locked: + break + else: + print("Radiofrequency not found", file=sys.stderr) + dev.cancel_sweep_frequency() + return 1 + + print(f"Radiofrequency detected: {frequency}MHz", file=sys.stderr) + print("You can now LET GO of the button", file=sys.stderr) + time.sleep(0.5) + + input("Press enter to continue...") + + return frequency + def format_packet(data: object) -> dict: + """Return the broadlink data packet as a well-formated object""" return { "raw": data.hex(), "base64": base64.b64encode(data).decode("ascii"), "pulses": format_pulses(data_to_pulses(data)), } -def write_learnfile(learnfile: str, durations: bool, packet: dict): - if learnfile: - print(f"Saving to {learnfile}", file=sys.stderr) - with open(learnfile, "w", encoding="utf-8") as text_file: - text_file.write(packet.pulse_fmt if durations else packet.raw_fmt) - - -def print_data(json_out: bool, data: any, sep: str=' '): - if json_out: - print(json.dumps(data, indent=4)) - return - - if isinstance(data, dict): - data = [data] - - for key in data[0].keys(): - print(key, sep, end='') - print() - for item in data: - for key in data[0].keys(): - print(item[key], sep, end='') - print() +def get_data(transmitter: str, dev: object, timeout: int, + frequency: float, prompt: str = None) -> object: + """Get data of transmitter type (RF|IR) from the given device-""" -def discover_devices(json_out: bool, ip=None) -> int: - try: - print(f"Scanning { (ip + ' on ') if ip else '' }local network...", file=sys.stderr) - devices = broadlink.discover(local_ip_address=ip) - devices[0] - for device in devices: - print_data(json_out, { "devtype": device.devtype, "host": device.host[0], "mac": device.mac.hex() }) - return 0 - except Exception as e: - print(f"Error in discover_devices: {e}", file=sys.stderr) - return 1 - -def do_learn(json_out: bool, dev: object, learnfile: str, timeout: int, durations: bool) -> int: - """Infrared learning""" + print(f"Awaiting {transmitter} code{ f' for {prompt} button' if prompt else '' }...", + file=sys.stderr, end=None) + if transmitter == "RF": + dev.find_rf_packet(frequency) + else: + dev.enter_learning() - dev.enter_learning() - print("Awaiting infrared code...", file=sys.stderr) start = time.time() while time.time() - start < timeout: time.sleep(1) try: data = dev.check_data() - except (ReadError, StorageError): + except ReadError: + print("_", end='') + continue + except StorageError: + print(".", end='') continue else: break else: - print("No data received...", file=sys.stderr) - return 1 + print() + print(f"No {transmitter} data received...", file=sys.stderr) + return None + print() print("Packet found!", file=sys.stderr) - packet = format_packet(data) - print_data(json_out, packet, sep=os.linesep) - write_learnfile(learnfile, durations, packet) + return format_packet(data) - return 0 +def print_data(out_fmt: str, data: any, sep: str=' '): + """Output the given data in the given format""" + if out_fmt == "json": + print(json.dumps(data, indent=4)) + return -def do_rflearn(json_out: bool, dev: object, frequency: float, learnfile: str, timeout: int, durations: bool) -> int: + if isinstance(data, dict): + data = [data] + + if out_fmt == "text": + for key in data[0].keys(): + print(key, sep, end='') + print() + + for item in data: + if out_fmt == "text": + for key in data[0].keys(): + print(item[key], sep, end='') + print() + else: + print(item[out_fmt]) + +def do_learn(transmitter: str, dev: object, out_fmt: str, learntemplate: str, learnfile: str, + timeout: int, frequency: float = None) -> int: """ - Radiofrequency learning + Learning """ - if frequency: - print("Press the button you want to learn, a SHORT press...", file=sys.stderr) + if transmitter == "RF" and not frequency: + frequency = get_frequency(dev, timeout) + + if learntemplate: + data = {} + for (key, value) in learntemplate.items(): + item_data = get_data(transmitter, dev, timeout, frequency, prompt=key) + if not item_data: + return 1 + data[key] = item_data[value] else: - dev.sweep_frequency() - print("Detecting radiofrequency, press and HOLD the button to learn...", file=sys.stderr) - - start = time.time() - while time.time() - start < timeout: - time.sleep(1) - locked, frequency = dev.check_frequency() - if locked: - break - else: - print("Radiofrequency not found", file=sys.stderr) - dev.cancel_sweep_frequency() - return 1 + data = get_data(transmitter, dev, timeout, frequency) - print(f"Radiofrequency detected: {frequency}MHz", file=sys.stderr) - print("You can now LET GO of the button", file=sys.stderr) - time.sleep(0.5) + print_data(out_fmt, data, sep=os.linesep) - input("Press enter to continue...") - print("Press the button again, now a SHORT press.", file=sys.stderr) + if learnfile: + if out_fmt == 'json': + output = json.dumps(data, indent=4) + elif out_fmt == 'text': + output = " ".join(data.keys()) + os.linesep + " ".join(data.values()) + else: + output = data[out_fmt] - dev.find_rf_packet(frequency) + print(f"Saving {out_fmt} data to {learnfile}", file=sys.stderr) + with open(learnfile, "w", encoding="utf-8") as text_file: + text_file.write(output) - start = time.time() - while time.time() - start < timeout: - time.sleep(1) - try: - data = dev.check_data() - except (ReadError, StorageError): - continue - else: - break - else: - print("No data received...", file=sys.stderr) - return 1 + return 0 - print("Packet found!", file=sys.stderr) - packet = format_packet(data) - print_data(json_out, packet, sep=os.linesep) - write_learnfile(learnfile, durations, packet) +def discover_devices(out_fmt: str, ip=None) -> int: + """Scan local network for Broadlink devices""" + try: + print(f"Scanning { (ip + ' on ') if ip else '' }local network...", + file=sys.stderr) + devices = broadlink.discover(local_ip_address=ip) + for device in devices: + print_data(out_fmt, { + "devtype": device.devtype, + "host": device.host[0], + "mac": device.mac.hex() }) + return 0 + except Exception as e: + print(f"Error in discover_devices: {e}", file=sys.stderr) + return 1 def main() -> int: @@ -265,45 +318,47 @@ def main() -> int: try: match args.action: case "temperature": - print_data(args.json_out, { "temperature": dev.check_temperature() }) + print_data(args.out_fmt, { "temperature": dev.check_temperature() }) case "humidity": - print_data(args.json_out, { "humidity": dev.check_humidity() }) + print_data(args.out_fmt, { "humidity": dev.check_humidity() }) case "energy": - print_data(args.json_out, { "energy": dev.get_energy()}) + print_data(args.out_fmt, { "energy": dev.get_energy()}) case "sensors": - print_data(args.json_out, dev.check_sensors() ) + print_data(args.out_fmt, dev.check_sensors() ) case "check": - print_data(args.json_out, { "power": dev.check_power() }) + print_data(args.out_fmt, { "power": dev.check_power() }) case "checknl": - print_data(args.json_out, { "nightlight": dev.check_nightlight() }) + print_data(args.out_fmt, { "nightlight": dev.check_nightlight() }) case "turnon": dev.set_power(True) - print_data(args.json_out, { "power": dev.check_power() }) + print_data(args.out_fmt, { "power": dev.check_power() }) case "turnoff": dev.set_power(False) - print_data(args.json_out, { "power": dev.check_power() }) + print_data(args.out_fmt, { "power": dev.check_power() }) case "switch": dev.set_power(not dev.check_power()) - print_data(args.json_out, { "power": dev.check_nightlight() }) + print_data(args.out_fmt, { "power": dev.check_nightlight() }) case "turnnlon": dev.set_nightlight(True) - print_data(args.json_out, { "nightlight": dev.check_nightlight() }) + print_data(args.out_fmt, { "nightlight": dev.check_nightlight() }) case "turnnloff": dev.set_nightlight(False) - print_data(args.json_out, { "nightlight": dev.check_nightlight() }) - + print_data(args.out_fmt, { "nightlight": dev.check_nightlight() }) + case "send": dev.send_data(args.data) case "discover": - return discover_devices(args.json_out, args.host) + return discover_devices(args.out_fmt, args.host) case "irlearn": - return do_learn(args.json_out, dev, args.learnfile, args.timeout, args.durations) + return do_learn("IR", dev, args.out_fmt, args.learntemplate, args.learnfile, + args.timeout) case "rflearn": - return do_rflearn(args.json_out, dev, args.frequency, args.learnfile, args.timeout, args.durations) - + return do_learn("RF", dev, args.out_fmt, args.learntemplate, args.learnfile, + args.timeout, args.frequency) + case "convert": print(data_to_pulses(bytearray.fromhex("".join(args.data)))) @@ -315,7 +370,7 @@ def main() -> int: print(f"Action {e.name} not supported by device {dev.TYPE} {dev.devtype}") return 2 else: - raise + raise e return 0 diff --git a/setup.py b/setup.py index 0426f148..0e613c92 100644 --- a/setup.py +++ b/setup.py @@ -1,20 +1,26 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +""" +Setup script for the python-broadlink package. +This script defines the installation requirements and metadata for the package. +To install the package, run: + python setup.py install +""" from setuptools import setup, find_packages -version = '0.19.0' +VERSION = '0.20.0' setup( name="broadlink", - version=version, + version=VERSION, author="Matthew Garrett", author_email="mjg59@srcf.ucam.org", url="http://github.com/mjg59/python-broadlink", packages=find_packages(), - scripts=[], + scripts=["cli/broadlink_cli"], install_requires=["cryptography>=3.2"], description="Python API for controlling Broadlink devices", classifiers=[ @@ -26,4 +32,9 @@ ], include_package_data=True, zip_safe=False, + entry_points={ + "console_scripts": [ + "broadlink-cli=broadlink.cli.broadlink_cli:main", + ], + }, ) From b8d3ca46101fbc73a41aff7d3f870bae57e9a58e Mon Sep 17 00:00:00 2001 From: Jason Haines Date: Sat, 18 Jan 2025 12:45:52 +0930 Subject: [PATCH 3/6] Make --data an arg CLI Send Command (from File) Not Working #194 --- cli/broadlink_cli | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/broadlink_cli b/cli/broadlink_cli index 40e10a21..1a566da0 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -109,7 +109,7 @@ def parse_args() -> object: parser.add_argument("--durations", action="store_true", help="Parse [data] durations in microsecond- instead of Broadlink-format") - parser.add_argument("data", nargs="*", help="Data to send or convert") + parser.add_argument("--data", nargs="*", help="Data to send or convert") args = parser.parse_args() From 3bb333eb333653fffc48a002ddee2aae0e0dc1d9 Mon Sep 17 00:00:00 2001 From: Jason Haines Date: Sat, 18 Jan 2025 12:46:17 +0930 Subject: [PATCH 4/6] Update cli/README.md markdown lint fixes Fix #803 CLI Readme description typo ("rfscanlearn") --- cli/README.md | 92 +++++++++++++++++++++++++++++---------------------- 1 file changed, 52 insertions(+), 40 deletions(-) diff --git a/cli/README.md b/cli/README.md index b7e48dc9..ea61a82e 100644 --- a/cli/README.md +++ b/cli/README.md @@ -1,138 +1,150 @@ -Command line interface for python-broadlink -=========================================== +# Command line interface for python-broadlink This is a command line interface for the python-broadlink API. +## Requirements -Requirements ------------- You need to install the module first: -``` + +```shell pip3 install broadlink ``` -Installation ------------ -Download "broadlink_cli" and "broadlink_discovery". - +broadlink_cli should then be in your path -Programs --------- -* broadlink_discovery: Discover Broadlink devices connected to the local network. +## Programs * broadlink_cli: Send commands and query the Broadlink device. - -Device specification formats ----------------------------- +## Device specification formats Using separate parameters for each information: -``` + +```shell broadlink_cli --type 0x2712 --host 1.1.1.1 --mac aaaaaaaaaa --temp ``` Using all parameters as a single argument: -``` + +```shell broadlink_cli --device "0x2712 1.1.1.1 aaaaaaaaaa" --temp ``` Using file with parameters: -``` + +```shell broadlink_cli --device @BEDROOM.device --temp ``` + This is prefered as the configuration is stored in a file and you can change it later to point to a different device. -Example usage -------------- +## Example usage ### Common commands #### Join device to the Wi-Fi network -``` + +```shell broadlink_cli --joinwifi SSID PASSWORD ``` #### Discover devices connected to the local network -``` + +```shell broadlink_discovery ``` ### Universal remotes #### Learn IR code and show at console -``` -broadlink_cli --device @BEDROOM.device --learn + +```shell +broadlink_cli --device @BEDROOM.device --irlearn ``` #### Learn RF code and show at console -``` -broadlink_cli --device @BEDROOM.device --rfscanlearn + +```shell +broadlink_cli --device @BEDROOM.device --rflearn ``` #### Learn IR code and save to file -``` + +```shell broadlink_cli --device @BEDROOM.device --learnfile LG-TV.power ``` #### Learn RF code and save to file -``` -broadlink_cli --device @BEDROOM.device --rfscanlearn --learnfile LG-TV.power + +```shell +broadlink_cli --device @BEDROOM.device --rflearn --learnfile LG-TV.power ``` #### Send code -``` + +```shell broadlink_cli --device @BEDROOM.device --send DATA ``` #### Send code from file -``` + +```shell broadlink_cli --device @BEDROOM.device --send @LG-TV.power ``` #### Check temperature -``` + +```shell broadlink_cli --device @BEDROOM.device --temperature ``` #### Check humidity -``` + +```shell broadlink_cli --device @BEDROOM.device --humidity ``` ### Smart plugs #### Turn on -``` + +```shell broadlink_cli --device @BEDROOM.device --turnon ``` #### Turn off -``` + +```shell broadlink_cli --device @BEDROOM.device --turnoff ``` #### Turn on nightlight -``` + +```shell broadlink_cli --device @BEDROOM.device --turnnlon ``` #### Turn off nightlight -``` + +```shell broadlink_cli --device @BEDROOM.device --turnnloff ``` #### Check power state -``` + +```shell broadlink_cli --device @BEDROOM.device --check ``` #### Check nightlight state -``` + +```shell broadlink_cli --device @BEDROOM.device --checknl ``` #### Check power consumption -``` + +```shell broadlink_cli --device @BEDROOM.device --energy ``` From d340e0f8b09f2e08af155cc1aaae28b0460729df Mon Sep 17 00:00:00 2001 From: Jason Haines Date: Thu, 23 Jan 2025 08:02:36 +0930 Subject: [PATCH 5/6] Additional print(file=sys.stderr) --- cli/broadlink_cli | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cli/broadlink_cli b/cli/broadlink_cli index 1a566da0..697d47c0 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -225,11 +225,11 @@ def get_data(transmitter: str, dev: object, timeout: int, else: break else: - print() + print(file=sys.stderr) print(f"No {transmitter} data received...", file=sys.stderr) return None - print() + print(file=sys.stderr) print("Packet found!", file=sys.stderr) return format_packet(data) @@ -366,8 +366,8 @@ def main() -> int: print(f"Unknown action: {args.action}") return 255 except AttributeError as e: - if hasattr(e.obj, 'TYPE'): - print(f"Action {e.name} not supported by device {dev.TYPE} {dev.devtype}") + if hasattr(e.obj, "TYPE"): + print(f"Action {e.name} not supported by device {dev.TYPE} {dev.devtype}", file=sys.stderr) return 2 else: raise e From ff90207f62f0cd2f83126b6ee68c791035baf524 Mon Sep 17 00:00:00 2001 From: Jason Haines Date: Thu, 23 Jan 2025 08:02:47 +0930 Subject: [PATCH 6/6] black code formatting broadlink_cli --- cli/broadlink_cli | 344 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 243 insertions(+), 101 deletions(-) diff --git a/cli/broadlink_cli b/cli/broadlink_cli index 697d47c0..a572c8e7 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -17,7 +17,7 @@ from broadlink.const import DEFAULT_PORT from broadlink.exceptions import ReadError, StorageError from broadlink.remote import data_to_pulses, pulses_to_data -DEFAULT_TIMEOUT = 30 # seconds +DEFAULT_TIMEOUT = 30 # seconds DEFAULT_DEVICE = 0x2712 @@ -25,6 +25,7 @@ def auto_int(x) -> int: """Parse the given value to an integer""" return int(x, 0) + def auto_hex(x) -> bytearray: """Parse the given hex string to a byte array""" return bytearray.fromhex(x) @@ -39,76 +40,180 @@ def parse_args() -> object: formatter_class=argparse.RawTextHelpFormatter, ) - action_group = parser.add_argument_group("Actions", "Specify the action to perform.") - action_group.add_argument("--sensors", dest="action", action="store_const", - const="sensors", help="Check all sensors") - action_group.add_argument("--temperature", dest="action", action="store_const", - const="temperature", help="Request temperature from the device") - action_group.add_argument("--humidity", dest="action", action="store_const", - const="humidity", help="Request humidity from the device") - - action_group.add_argument("--energy", dest="action", action="store_const", - const="energy", help="Request energy consumption from the device") - action_group.add_argument("--check", dest="action", action="store_const", - const="check", help="Check current power state") - action_group.add_argument("--checknl", dest="action", action="store_const", - const="checknl", help="Check current nightlight state") - - action_group.add_argument("--turnon", dest="action", action="store_const", - const="turnon", help="Turn on the device") - action_group.add_argument("--turnoff", dest="action", action="store_const", - const="turnoff", help="Turn off the device") - action_group.add_argument("--turnnlon", dest="action", action="store_const", - const="turnnlon", help="Turn on nightlight on the device") - action_group.add_argument("--turnnloff", dest="action", action="store_const", - const="turnnloff", help="Turn off nightlight on the device") - action_group.add_argument("--switch", dest="action", action="store_const", - const="switch", help="Switch state from on to off and off to on") - - action_group.add_argument("--send", dest="action", action="store_const", - const="send", help="Send command") - - action_group.add_argument("--convert", dest="action", action="store_const", - const="convert", help="Convert input data to durations") - - action_group.add_argument("--discover", dest="action", action="store_const", const="discover", - help="Scan the local network to discover Broadlink devices") - action_group.add_argument("--irlearn", "--learn", dest="action", action="store_const", - const="irlearn", help="IR learning") - action_group.add_argument("--rflearn", dest="action", action="store_const", - const="rflearn", help="RF scan learning") - - parser.add_argument("--frequency", type=float, - help="Specify radiofrequency for --rflean, otherwise scan") + action_group = parser.add_argument_group( + "Actions", "Specify the action to perform." + ) + action_group.add_argument( + "--sensors", + dest="action", + action="store_const", + const="sensors", + help="Check all sensors", + ) + action_group.add_argument( + "--temperature", + dest="action", + action="store_const", + const="temperature", + help="Request temperature from the device", + ) + action_group.add_argument( + "--humidity", + dest="action", + action="store_const", + const="humidity", + help="Request humidity from the device", + ) + + action_group.add_argument( + "--energy", + dest="action", + action="store_const", + const="energy", + help="Request energy consumption from the device", + ) + action_group.add_argument( + "--check", + dest="action", + action="store_const", + const="check", + help="Check current power state", + ) + action_group.add_argument( + "--checknl", + dest="action", + action="store_const", + const="checknl", + help="Check current nightlight state", + ) + + action_group.add_argument( + "--turnon", + dest="action", + action="store_const", + const="turnon", + help="Turn on the device", + ) + action_group.add_argument( + "--turnoff", + dest="action", + action="store_const", + const="turnoff", + help="Turn off the device", + ) + action_group.add_argument( + "--turnnlon", + dest="action", + action="store_const", + const="turnnlon", + help="Turn on nightlight on the device", + ) + action_group.add_argument( + "--turnnloff", + dest="action", + action="store_const", + const="turnnloff", + help="Turn off nightlight on the device", + ) + action_group.add_argument( + "--switch", + dest="action", + action="store_const", + const="switch", + help="Switch state from on to off and off to on", + ) + + action_group.add_argument( + "--send", dest="action", action="store_const", const="send", help="Send command" + ) + + action_group.add_argument( + "--convert", + dest="action", + action="store_const", + const="convert", + help="Convert input data to durations", + ) + + action_group.add_argument( + "--discover", + dest="action", + action="store_const", + const="discover", + help="Scan the local network to discover Broadlink devices", + ) + action_group.add_argument( + "--irlearn", + "--learn", + dest="action", + action="store_const", + const="irlearn", + help="IR learning", + ) + action_group.add_argument( + "--rflearn", + dest="action", + action="store_const", + const="rflearn", + help="RF scan learning", + ) + + parser.add_argument( + "--frequency", + type=float, + help="Specify radiofrequency for --rflean, otherwise scan", + ) parser.add_argument("--device", help="Device definition as 'type host mac'") - parser.add_argument("--type", type=auto_int, default=DEFAULT_DEVICE, dest="devtype", - help="Type of device") + parser.add_argument( + "--type", + type=auto_int, + default=DEFAULT_DEVICE, + dest="devtype", + help="Type of device", + ) parser.add_argument("--host", help="Host address") - parser.add_argument("--mac", type=auto_hex, - help="MAC address (hex reverse), as used by python-broadlink library") + parser.add_argument( + "--mac", + type=auto_hex, + help="MAC address (hex reverse), as used by python-broadlink library", + ) parser.add_argument("--port", type=int, default=DEFAULT_PORT, help="Broadlink port") - parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, - help="The timeout period for actions") + parser.add_argument( + "--timeout", + type=int, + default=DEFAULT_TIMEOUT, + help="The timeout period for actions", + ) - parser.add_argument("--joinwifi", nargs=2, - help="SSID and PASSPHRASE to configure Broadlink device with") + parser.add_argument( + "--joinwifi", + nargs=2, + help="SSID and PASSPHRASE to configure Broadlink device with", + ) - parser.add_argument("--learntemplate", - help='Specified template for learning and outputing multiple controls ' - '- e.g. --learntemplate \'{ "on" : "raw", "off": "raw" }\'') + parser.add_argument( + "--learntemplate", + help="Specified template for learning and outputing multiple controls " + '- e.g. --learntemplate \'{ "on" : "raw", "off": "raw" }\'', + ) parser.add_argument("--learnfile", help="Save learned command to a specified file") - parser.add_argument("--output", dest="out_fmt", - choices=['json', 'text', 'raw', 'base64', 'pulses'], - default='json', + parser.add_argument( + "--output", + dest="out_fmt", + choices=["json", "text", "raw", "base64", "pulses"], + default="json", help="Specify the output format. 'json' (default), 'text', or a single learned field:" - "'raw' (hex; default Broadlink format), 'base64', 'pulses' (microsecond-format) " + "'raw' (hex; default Broadlink format), 'base64', 'pulses' (microsecond-format) ", ) - parser.add_argument("--durations", action="store_true", - help="Parse [data] durations in microsecond- instead of Broadlink-format") + parser.add_argument( + "--durations", + action="store_true", + help="Parse [data] durations in microsecond- instead of Broadlink-format", + ) parser.add_argument("--data", nargs="*", help="Data to send or convert") args = parser.parse_args() @@ -136,10 +241,8 @@ def parse_args() -> object: ) elif args.action not in ["irlearn", "rflearn"]: if args.learntemplate: - parser.error( - "--learntemplate can only be used with --learn or --rflearn" - ) - if args.out_fmt not in [ "json", "text" ]: + parser.error("--learntemplate can only be used with --learn or --rflearn") + if args.out_fmt not in ["json", "text"]: parser.error( f"--output {args.out_fmt} can only be used with --learn or --rflearn" ) @@ -157,21 +260,26 @@ def parse_args() -> object: return args + def format_pulses(pulses: List[int]) -> str: """Concatentate the list of pulses""" return " ".join( - f"+{pulse}" if i % 2 == 0 else f"-{pulse}" - for i, pulse in enumerate(pulses) + f"+{pulse}" if i % 2 == 0 else f"-{pulse}" for i, pulse in enumerate(pulses) ) + def parse_pulses(data: List[str]) -> List[int]: """Parse pulses.""" return [abs(int(s)) for s in data] + def get_frequency(dev: object, timeout: int): """Scan to detect RF remote frequency""" dev.sweep_frequency() - print("Detecting radiofrequency, press and HOLD the button to learn...", file=sys.stderr) + print( + "Detecting radiofrequency, press and HOLD the button to learn...", + file=sys.stderr, + ) start = time.time() while time.time() - start < timeout: @@ -192,6 +300,7 @@ def get_frequency(dev: object, timeout: int): return frequency + def format_packet(data: object) -> dict: """Return the broadlink data packet as a well-formated object""" return { @@ -200,12 +309,17 @@ def format_packet(data: object) -> dict: "pulses": format_pulses(data_to_pulses(data)), } -def get_data(transmitter: str, dev: object, timeout: int, - frequency: float, prompt: str = None) -> object: + +def get_data( + transmitter: str, dev: object, timeout: int, frequency: float, prompt: str = None +) -> object: """Get data of transmitter type (RF|IR) from the given device-""" - print(f"Awaiting {transmitter} code{ f' for {prompt} button' if prompt else '' }...", - file=sys.stderr, end=None) + print( + f"Awaiting {transmitter} code{ f' for {prompt} button' if prompt else '' }...", + file=sys.stderr, + end=None, + ) if transmitter == "RF": dev.find_rf_packet(frequency) else: @@ -217,10 +331,10 @@ def get_data(transmitter: str, dev: object, timeout: int, try: data = dev.check_data() except ReadError: - print("_", end='') + print("_", end="") continue except StorageError: - print(".", end='') + print(".", end="") continue else: break @@ -233,7 +347,8 @@ def get_data(transmitter: str, dev: object, timeout: int, print("Packet found!", file=sys.stderr) return format_packet(data) -def print_data(out_fmt: str, data: any, sep: str=' '): + +def print_data(out_fmt: str, data: any, sep: str = " "): """Output the given data in the given format""" if out_fmt == "json": print(json.dumps(data, indent=4)) @@ -244,19 +359,27 @@ def print_data(out_fmt: str, data: any, sep: str=' '): if out_fmt == "text": for key in data[0].keys(): - print(key, sep, end='') + print(key, sep, end="") print() for item in data: if out_fmt == "text": for key in data[0].keys(): - print(item[key], sep, end='') + print(item[key], sep, end="") print() else: print(item[out_fmt]) -def do_learn(transmitter: str, dev: object, out_fmt: str, learntemplate: str, learnfile: str, - timeout: int, frequency: float = None) -> int: + +def do_learn( + transmitter: str, + dev: object, + out_fmt: str, + learntemplate: str, + learnfile: str, + timeout: int, + frequency: float = None, +) -> int: """ Learning """ @@ -266,7 +389,7 @@ def do_learn(transmitter: str, dev: object, out_fmt: str, learntemplate: str, le if learntemplate: data = {} - for (key, value) in learntemplate.items(): + for key, value in learntemplate.items(): item_data = get_data(transmitter, dev, timeout, frequency, prompt=key) if not item_data: return 1 @@ -277,9 +400,9 @@ def do_learn(transmitter: str, dev: object, out_fmt: str, learntemplate: str, le print_data(out_fmt, data, sep=os.linesep) if learnfile: - if out_fmt == 'json': + if out_fmt == "json": output = json.dumps(data, indent=4) - elif out_fmt == 'text': + elif out_fmt == "text": output = " ".join(data.keys()) + os.linesep + " ".join(data.values()) else: output = data[out_fmt] @@ -290,17 +413,23 @@ def do_learn(transmitter: str, dev: object, out_fmt: str, learntemplate: str, le return 0 + def discover_devices(out_fmt: str, ip=None) -> int: """Scan local network for Broadlink devices""" try: - print(f"Scanning { (ip + ' on ') if ip else '' }local network...", - file=sys.stderr) + print( + f"Scanning { (ip + ' on ') if ip else '' }local network...", file=sys.stderr + ) devices = broadlink.discover(local_ip_address=ip) for device in devices: - print_data(out_fmt, { - "devtype": device.devtype, - "host": device.host[0], - "mac": device.mac.hex() }) + print_data( + out_fmt, + { + "devtype": device.devtype, + "host": device.host[0], + "mac": device.mac.hex(), + }, + ) return 0 except Exception as e: print(f"Error in discover_devices: {e}", file=sys.stderr) @@ -318,34 +447,34 @@ def main() -> int: try: match args.action: case "temperature": - print_data(args.out_fmt, { "temperature": dev.check_temperature() }) + print_data(args.out_fmt, {"temperature": dev.check_temperature()}) case "humidity": - print_data(args.out_fmt, { "humidity": dev.check_humidity() }) + print_data(args.out_fmt, {"humidity": dev.check_humidity()}) case "energy": - print_data(args.out_fmt, { "energy": dev.get_energy()}) + print_data(args.out_fmt, {"energy": dev.get_energy()}) case "sensors": - print_data(args.out_fmt, dev.check_sensors() ) + print_data(args.out_fmt, dev.check_sensors()) case "check": - print_data(args.out_fmt, { "power": dev.check_power() }) + print_data(args.out_fmt, {"power": dev.check_power()}) case "checknl": - print_data(args.out_fmt, { "nightlight": dev.check_nightlight() }) + print_data(args.out_fmt, {"nightlight": dev.check_nightlight()}) case "turnon": dev.set_power(True) - print_data(args.out_fmt, { "power": dev.check_power() }) + print_data(args.out_fmt, {"power": dev.check_power()}) case "turnoff": dev.set_power(False) - print_data(args.out_fmt, { "power": dev.check_power() }) + print_data(args.out_fmt, {"power": dev.check_power()}) case "switch": dev.set_power(not dev.check_power()) - print_data(args.out_fmt, { "power": dev.check_nightlight() }) + print_data(args.out_fmt, {"power": dev.check_nightlight()}) case "turnnlon": dev.set_nightlight(True) - print_data(args.out_fmt, { "nightlight": dev.check_nightlight() }) + print_data(args.out_fmt, {"nightlight": dev.check_nightlight()}) case "turnnloff": dev.set_nightlight(False) - print_data(args.out_fmt, { "nightlight": dev.check_nightlight() }) + print_data(args.out_fmt, {"nightlight": dev.check_nightlight()}) case "send": dev.send_data(args.data) @@ -353,11 +482,24 @@ def main() -> int: case "discover": return discover_devices(args.out_fmt, args.host) case "irlearn": - return do_learn("IR", dev, args.out_fmt, args.learntemplate, args.learnfile, - args.timeout) + return do_learn( + "IR", + dev, + args.out_fmt, + args.learntemplate, + args.learnfile, + args.timeout, + ) case "rflearn": - return do_learn("RF", dev, args.out_fmt, args.learntemplate, args.learnfile, - args.timeout, args.frequency) + return do_learn( + "RF", + dev, + args.out_fmt, + args.learntemplate, + args.learnfile, + args.timeout, + args.frequency, + ) case "convert": print(data_to_pulses(bytearray.fromhex("".join(args.data))))