diff --git a/API.md b/API.md index 7f12285..02da31e 100644 --- a/API.md +++ b/API.md @@ -233,11 +233,14 @@ It is best to use event handlers to access configuration data as then you are as __isConnected()__ which tells you if this NodeServer and Polyglot are connected via MQTT. -__addNode(node, conn_status = None)__ Adds a new node to Polyglot. You fist need to instantiate a +__addNode(node, conn_status = None, rename = False)__ Adds a new node to Polyglot. You fist need to instantiate a node using your custom class, which you then pass to addNode. Return value is the node passed in. -If conn\_status is set to a driver string, this node and that driver specified will be used by PG3 to represent the connection status (0 = disconnected, 1 = connected, 2 = failed). By default, conn\_status is None. +If conn\_status is set to a driver string, this node and the driver specified will be used by PG3 to represent the connection status (0 = disconnected, 1 = connected, 2 = failed). By default, conn\_status is None. When the node server connects via MQTT, PG3 will send a value of 1 to the ISY/IoP for that node/driver. When PG3 stops the node server it will send a value of 0 to the ISY/IoP for that node/driver. If the MQTT connection drops for any other reason, PG3 will send a value of 2 to the ISY/IoP for that node/driver. Node server should use uom 25 to display status correctly. + +If rename is set to True, addNode will attempt to rename the node on the ISY and in the PG3 database to match the name specified in node. When rename is set to False, the name of the node on the ISY and in PG3's database record are unchanged. + Notes: 1. Only Node class common information is stored in the database, not your @@ -248,7 +251,6 @@ Notes: a list of nodes available in the onConfig handler. However, you should still call __addNode()__ for each node to replace the generic node object with your custom node class object. -``` __getConfig()__ Returns a copy of the last config received. @@ -256,7 +258,8 @@ __getNodes()__ Returns your list of nodes. The interface will attempt to wrap the list of nodes from Polyglot with your custom classes. But this can fail if your custom class needs additional parameters when creating the class object. Your node server should call addNode() to make sure the objects on -this list are your custom class objects. +this list are your custom class objects. Note that the behavior is undefined +if you try to modify the list while iterating it. __getNode(address)__ Returns a single node. diff --git a/udi_interface/__init__.py b/udi_interface/__init__.py index e3cd0ba..d5ea806 100755 --- a/udi_interface/__init__.py +++ b/udi_interface/__init__.py @@ -7,7 +7,7 @@ from .custom import Custom from .isy import ISY -__version__ = '3.0.47' +__version__ = '3.0.51' __description__ = 'UDI Python Interface for Polyglot version 3' __url__ = 'https://github.com/UniversalDevicesInc/udi_python_interface' __author__ = 'Universal Devices Inc.' diff --git a/udi_interface/interface.py b/udi_interface/interface.py index d39b213..6a5fee8 100644 --- a/udi_interface/interface.py +++ b/udi_interface/interface.py @@ -7,7 +7,7 @@ import logging import markdown2 import os -from os.path import join, expanduser +from os.path import join, expanduser, exists import paho.mqtt.client as mqtt try: import queue @@ -70,6 +70,17 @@ class pub(object): cfg_threads = [] topic_data = [] + ''' + when called, this adds a callback, address pair to a topic. The 'topics' + dictionary will look like: + topics = { + config: [(callback, address), (callback, address)], + start: [(callback, address), (callback, address)], + } + + When something subscribes, send any previous published events for the + topic to the subscriber. I.E. backlog events. + ''' @staticmethod def subscribe(topic, callback, address): if int(topic) >= len(pub.topic_list): @@ -80,26 +91,44 @@ def subscribe(topic, callback, address): else: pub.topics[pub.topic_list[topic]].append([callback, address]) - # QUESTION: Should this publish any existing info to the subscriber? + # Send backlog events if any for item in pub.topic_data: - if item[0] == topic and item[1] == address: + if item[0] == topic and (address == None or item[1] == address): Thread(target=callback, args=item[2:]).start() + ''' + when we publish an event, we first push the event on to the backlog + queue so that any later subscribers will get the event when they + subscribe. + ''' @staticmethod def publish(topic, address, *argv): pub.topic_data.append([topic, address, *argv]) + + # check if anyone has subscribed to this event if pub.topic_list[topic] in pub.topics: + # loop through all of the subscribers for item in pub.topics[pub.topic_list[topic]]: - if item[1] == address: + ''' + With the exception of the START event, all others are + published with address == None. + + For the START event we want to check the subscribers + address filter value (item[1]) and compare it with the + address in the event. For all other events we don't + really care. + ''' + if address == None or item[1] == address: Thread(target=item[0], args=[*argv]).start() @staticmethod def publish_nt(topic, address, *argv): pub.topic_data.append([topic, address, *argv]) + if pub.topic_list[topic] in pub.topics: for item in pub.topics[pub.topic_list[topic]]: - if item[1] == address: + if address == None or item[1] == address: t = Thread(target=item[0], args=[*argv]) pub.cfg_threads.append(t) t.start() @@ -107,9 +136,10 @@ def publish_nt(topic, address, *argv): @staticmethod def publish_wait(topic, address, *argv): pub.topic_data.append([topic, address, *argv]) + if pub.topic_list[topic] in pub.topics: for item in pub.topics[pub.topic_list[topic]]: - if item[1] == address: + if address == None or item[1] == address: Thread(target=pub.waitForFinish, args=[item[0], *argv]).start() @@ -196,20 +226,16 @@ def __init__(self, classes, envVar=None): self._threads['input'] = Thread( target=self._parseInput, name='Command') self._mqttc = mqtt.Client(self.id, True) - self._mqttc.username_pw_set(self.id, self.pg3init['token']) self._mqttc.on_connect = self._connect self._mqttc.on_message = self._message self._mqttc.on_subscribe = self._subscribe self._mqttc.on_disconnect = self._disconnect self._mqttc.on_publish = self._publish self._mqttc.on_log = self._log + self.using_mosquitto = True self.useSecure = True self._nodes = {} self.nodes_internal = {} - if self.pg3init['secure'] == 1: - self.sslContext = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) - self.sslContext.check_hostname = False - self._mqttc.tls_set_context(self.sslContext) self.loop = None self.inQueue = queue.Queue() self.isyVersion = None @@ -269,6 +295,14 @@ def _connect(self, mqttc, userdata, flags, rc): results = [] LOGGER.info("MQTT Connected with result code " + str(rc) + " (Success)") + + # Publish connection status and set up will + if self.using_mosquitto: + connected = {"connected": [{}]} + self._mqttc.publish('udi/pg3/connections/ns/{}'.format(self.id), json.dumps(connected), retain=True) + failed = {"disconnected": [{}]} + self._mqttc.will_set('udi/pg3/connections/ns/{}'.format(self.id), json.dumps(failed), qos=0, retain=True) + results.append((self.topicInput, tuple( self._mqttc.subscribe(self.topicInput)))) for (topic, (result, mid)) in results: @@ -380,7 +414,16 @@ def _message(self, mqttc, userdata, msg): elif key == 'setLogLevelList': LOGGER.info('setLogList response {}'.format(parsed_msg[key])) elif key == 'renamenode': + # [{'address': 'addr_0001', 'name': 'today', 'success': True}] LOGGER.info('renamenode response {}'.format(parsed_msg[key])) + for resp in parsed_msg[key]: + try: + if resp['success']: + addr = resp['address'] + self.nodes_internal[addr].name = resp['name'] + except Exception as error: + LOGGER.error('Failed to update internal nodelist: {} :: {}'.format(resp, error)) + else: LOGGER.error( 'Invalid command received in message from PG3: \'{}\' {}'.format(key, parsed_msg[key])) @@ -445,6 +488,34 @@ def _startMqtt(self): """ LOGGER.info('Connecting to MQTT... {}:{}'.format( self._server, self._port)) + + self.username = self.id.replace(':', '') + + # Load the client SSL certificate. What if this fails? + if self.pg3init['secure'] == 1: + self.sslContext = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + self.sslContext.check_hostname = False + cert = self.username + ".cert" + key = self.username + ".key" + + # only if certs exist! + if exists(cert) and exists(key): + LOGGER.info('Using SSL certs: {} {}'.format(cert, key)) + self.sslContext.load_cert_chain(cert, key) + self.using_mosquitto = True + else: + self.username = self.id + self.using_mosquitto = False + + self._mqttc.tls_set_context(self.sslContext) + + self._mqttc.username_pw_set(self.username, self.pg3init['token']) + + if self.using_mosquitto: + # Set up the will, do we need this here? + failed = {"disconnected": [{}]} + self._mqttc.will_set('udi/pg3/connections/ns/{}'.format(self.id), json.dumps(failed), qos=0, retain=True) + done = False while not done: try: @@ -467,13 +538,8 @@ def _startMqtt(self): def _get_server_data(self): """ _get_server_data: Loads the server.json and returns as a dict - :param check_profile: Calls the check_profile method if True - - If profile_version in json is null then profile will be loaded on - every restart. - """ - self.serverdata = {'version': 'unknown'} + self.serverdata = {'version': '0.0.0', 'profile_version': 'NotDefined'} # Read the SERVER info from the json. try: @@ -481,8 +547,14 @@ def _get_server_data(self): self.serverdata = json.load(data) data.close() except Exception as err: - LOGGER.error('get_server_data: failed to read file {0}: {1}'.format( - Interface.SERVER_JSON_FILE_NAME, err), exc_info=True) + """ + Failure to load the server.json file is no longer an error. + The only things that may be used from the server.json file are + the version number (as a fallback if start isn't called with one) + and the profile_version for checkProfile(). + """ + LOGGER.warning('get_server_data: failed to read file {0}: {1}'.format( + Interface.SERVER_JSON_FILE_NAME, err), exc_info=False) return # Get the version info @@ -512,6 +584,8 @@ def stop(self): LOGGER.info('Disconnecting from MQTT... {}:{}'.format( self._server, self._port)) self._mqttc.loop_stop() + disconnect = {"disconnected": [{}]} + self._mqttc.publish('udi/pg3/connections/ns/{}'.format(self.id), json.dumps(disconnect), retain=True) self._mqttc.disconnect() def send(self, message, type): @@ -896,6 +970,8 @@ def start(self, version=None): # Tell PG3 our version if version: self.serverdata['version'] = version + else: + LOGGER.warning('No node server version specified. Using deprecated server.json version') def ready(self): @@ -912,16 +988,12 @@ def isConnected(self): """ Tells you if this nodeserver and Polyglot are connected via MQTT """ return self.connected - def addNode(self, node, conn_status=None): + def addNode(self, node, conn_status=None, rename=False): """ Add a node to the NodeServer :param node: Dictionary of node settings. Keys: address, name, node_def_id, primary, and drivers are required. """ - if node.address in self._nodes and self._nodes[node.address]['name'] != node.name: - LOGGER.warning("addNode(): Cannot be used to change the node's name") - node.name = self._nodes[node.address]['name'] - LOGGER.info('Adding node {}({}) [{}]'.format(node.name, node.address, node.private)) message = { 'addnode': [{ @@ -931,7 +1003,8 @@ def addNode(self, node, conn_status=None): 'primaryNode': node.primary, 'drivers': node.drivers, 'hint': node.hint, - 'private': node.private + 'private': node.private, + 'rename': rename }] } self.send(message, 'command') @@ -954,7 +1027,7 @@ def getConfig(self): """ Returns a copy of the last config received. """ return self.config - def db_getNodeDrivers(self, addr = None): + def db_getNodeDrivers(self, addr = None, init = False): """ Returns a list of nodes or a list of drivers that were saved in the database. @@ -986,7 +1059,9 @@ def db_getNodeDrivers(self, addr = None): for n in self._nodes: if self._nodes[n]['address'] == addr: return self._nodes[n]['drivers'] # this is an array - LOGGER.warning(f'{addr} not found in database.') + # ignore the warning if we're initialzing the node. + if not init: + LOGGER.warning(f'{addr} not found in database.') else: for n in self._nodes: nl.append(self._nodes[n]) @@ -1055,17 +1130,23 @@ def getNode(self, address): def renameNode(self, address, newname): """ - Rename a node from the Node Server + Rename a node from the Node Server. + Can we do this if the node is not on the internal node list? """ - LOGGER.info('Renaming node {}'.format(address)) - message = { + if address in self.nodes_internal: + LOGGER.info('Renaming node {}'.format(address)) + message = { 'renamenode': [{ 'address': address, 'name': newname }] - } + } + + self.send(message, 'command') + self.nodes_internal[address].name = newname + else: + LOGGER.error('renameNode: Node {} doesn\'t exist'.format(address)) - self.send(message, 'command') def delNode(self, address): """ @@ -1205,14 +1286,13 @@ def checkProfile(self, force=False, build_profile=None): LOGGER.debug('check_profile: force={} build_profile={}'.format( force, build_profile)) - """ FIXME: this should be from self._ifaceData """ cdata = self._ifaceData.profile_version LOGGER.debug('check_profile: saved_version={}'.format(cdata)) LOGGER.debug('check_profile: profile_version={}'.format( self.serverdata['profile_version'])) if self.serverdata['profile_version'] == "NotDefined": - LOGGER.error( + LOGGER.warning( 'check_profile: Ignoring since nodeserver does not have profile_version') return False diff --git a/udi_interface/node.py b/udi_interface/node.py index b124d0d..697eb04 100644 --- a/udi_interface/node.py +++ b/udi_interface/node.py @@ -53,7 +53,7 @@ def _convertDrivers(self, drivers): def updateDrivers(self, drivers): self.drivers = deepcopy(drivers) - def _updateDrivers(self, poly, address): + def _updateDrivers(self, poly, address, init=True): db_drivers = poly.db_getNodeDrivers(address) try: for drv in db_drivers: @@ -144,6 +144,7 @@ def rename(self, newname): } self.poly.send(message, 'command') + self.name = newname def reportCmd(self, command, value=None, uom=None): message = { @@ -164,13 +165,13 @@ def runCmd(self, command): fun = self.commands[command['cmd']] fun(self, command) else: - NLOGGER.error('command {} not defined'.format(command['cmd'])) + NLOGGER.error('node {} command {} not defined'.format(self.address,command['cmd'])) elif 'success' in command: if not command['success']: - NLOGGER.error('Command message failed: {}'.format(command)) + NLOGGER.error('Command message failed for node {}: {}'.format(self.address,command)) else: - NLOGGER.error('Invalid command message: {}'.format(command)) + NLOGGER.error('Invalid command message for node {}: {}'.format(self.address,command)) def start(self):