Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,14 @@ It is best to use event handlers to access configuration data as then you are as

__isConnected()__ which tells you if this NodeServer and Polyglot are connected via MQTT.

__addNode(node, conn_status = None)__ Adds a new node to Polyglot. You fist need to instantiate a
__addNode(node, conn_status = None, rename = False)__ Adds a new node to Polyglot. You fist need to instantiate a
node using your custom class, which you then pass to addNode. Return value
is the node passed in.

If conn\_status is set to a driver string, this node and that driver specified will be used by PG3 to represent the connection status (0 = disconnected, 1 = connected, 2 = failed). By default, conn\_status is None.
If conn\_status is set to a driver string, this node and the driver specified will be used by PG3 to represent the connection status (0 = disconnected, 1 = connected, 2 = failed). By default, conn\_status is None. When the node server connects via MQTT, PG3 will send a value of 1 to the ISY/IoP for that node/driver. When PG3 stops the node server it will send a value of 0 to the ISY/IoP for that node/driver. If the MQTT connection drops for any other reason, PG3 will send a value of 2 to the ISY/IoP for that node/driver. Node server should use uom 25 to display status correctly.

If rename is set to True, addNode will attempt to rename the node on the ISY and in the PG3 database to match the name specified in node. When rename is set to False, the name of the node on the ISY and in PG3's database record are unchanged.


Notes:
1. Only Node class common information is stored in the database, not your
Expand All @@ -248,15 +251,15 @@ Notes:
a list of nodes available in the onConfig handler. However, you should
still call __addNode()__ for each node to replace the generic node object
with your custom node class object.
```

__getConfig()__ Returns a copy of the last config received.

__getNodes()__ Returns your list of nodes. The interface will attempt to wrap
the list of nodes from Polyglot with your custom classes. But this can fail
if your custom class needs additional parameters when creating the class
object. Your node server should call addNode() to make sure the objects on
this list are your custom class objects.
this list are your custom class objects. Note that the behavior is undefined
if you try to modify the list while iterating it.

__getNode(address)__ Returns a single node.

Expand Down
2 changes: 1 addition & 1 deletion udi_interface/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .custom import Custom
from .isy import ISY

__version__ = '3.0.47'
__version__ = '3.0.51'
__description__ = 'UDI Python Interface for Polyglot version 3'
__url__ = 'https://github.com/UniversalDevicesInc/udi_python_interface'
__author__ = 'Universal Devices Inc.'
Expand Down
148 changes: 114 additions & 34 deletions udi_interface/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
import markdown2
import os
from os.path import join, expanduser
from os.path import join, expanduser, exists
import paho.mqtt.client as mqtt
try:
import queue
Expand Down Expand Up @@ -70,6 +70,17 @@ class pub(object):
cfg_threads = []
topic_data = []

'''
when called, this adds a callback, address pair to a topic. The 'topics'
dictionary will look like:
topics = {
config: [(callback, address), (callback, address)],
start: [(callback, address), (callback, address)],
}

When something subscribes, send any previous published events for the
topic to the subscriber. I.E. backlog events.
'''
@staticmethod
def subscribe(topic, callback, address):
if int(topic) >= len(pub.topic_list):
Expand All @@ -80,36 +91,55 @@ def subscribe(topic, callback, address):
else:
pub.topics[pub.topic_list[topic]].append([callback, address])

# QUESTION: Should this publish any existing info to the subscriber?
# Send backlog events if any
for item in pub.topic_data:
if item[0] == topic and item[1] == address:
if item[0] == topic and (address == None or item[1] == address):
Thread(target=callback, args=item[2:]).start()


'''
when we publish an event, we first push the event on to the backlog
queue so that any later subscribers will get the event when they
subscribe.
'''
@staticmethod
def publish(topic, address, *argv):
pub.topic_data.append([topic, address, *argv])

# check if anyone has subscribed to this event
if pub.topic_list[topic] in pub.topics:
# loop through all of the subscribers
for item in pub.topics[pub.topic_list[topic]]:
if item[1] == address:
'''
With the exception of the START event, all others are
published with address == None.

For the START event we want to check the subscribers
address filter value (item[1]) and compare it with the
address in the event. For all other events we don't
really care.
'''
if address == None or item[1] == address:
Thread(target=item[0], args=[*argv]).start()

@staticmethod
def publish_nt(topic, address, *argv):
pub.topic_data.append([topic, address, *argv])

if pub.topic_list[topic] in pub.topics:
for item in pub.topics[pub.topic_list[topic]]:
if item[1] == address:
if address == None or item[1] == address:
t = Thread(target=item[0], args=[*argv])
pub.cfg_threads.append(t)
t.start()

@staticmethod
def publish_wait(topic, address, *argv):
pub.topic_data.append([topic, address, *argv])

if pub.topic_list[topic] in pub.topics:
for item in pub.topics[pub.topic_list[topic]]:
if item[1] == address:
if address == None or item[1] == address:
Thread(target=pub.waitForFinish, args=[item[0], *argv]).start()


Expand Down Expand Up @@ -196,20 +226,16 @@ def __init__(self, classes, envVar=None):
self._threads['input'] = Thread(
target=self._parseInput, name='Command')
self._mqttc = mqtt.Client(self.id, True)
self._mqttc.username_pw_set(self.id, self.pg3init['token'])
self._mqttc.on_connect = self._connect
self._mqttc.on_message = self._message
self._mqttc.on_subscribe = self._subscribe
self._mqttc.on_disconnect = self._disconnect
self._mqttc.on_publish = self._publish
self._mqttc.on_log = self._log
self.using_mosquitto = True
self.useSecure = True
self._nodes = {}
self.nodes_internal = {}
if self.pg3init['secure'] == 1:
self.sslContext = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
self.sslContext.check_hostname = False
self._mqttc.tls_set_context(self.sslContext)
self.loop = None
self.inQueue = queue.Queue()
self.isyVersion = None
Expand Down Expand Up @@ -269,6 +295,14 @@ def _connect(self, mqttc, userdata, flags, rc):
results = []
LOGGER.info("MQTT Connected with result code " +
str(rc) + " (Success)")

# Publish connection status and set up will
if self.using_mosquitto:
connected = {"connected": [{}]}
self._mqttc.publish('udi/pg3/connections/ns/{}'.format(self.id), json.dumps(connected), retain=True)
failed = {"disconnected": [{}]}
self._mqttc.will_set('udi/pg3/connections/ns/{}'.format(self.id), json.dumps(failed), qos=0, retain=True)

results.append((self.topicInput, tuple(
self._mqttc.subscribe(self.topicInput))))
for (topic, (result, mid)) in results:
Expand Down Expand Up @@ -380,7 +414,16 @@ def _message(self, mqttc, userdata, msg):
elif key == 'setLogLevelList':
LOGGER.info('setLogList response {}'.format(parsed_msg[key]))
elif key == 'renamenode':
# [{'address': 'addr_0001', 'name': 'today', 'success': True}]
LOGGER.info('renamenode response {}'.format(parsed_msg[key]))
for resp in parsed_msg[key]:
try:
if resp['success']:
addr = resp['address']
self.nodes_internal[addr].name = resp['name']
except Exception as error:
LOGGER.error('Failed to update internal nodelist: {} :: {}'.format(resp, error))

else:
LOGGER.error(
'Invalid command received in message from PG3: \'{}\' {}'.format(key, parsed_msg[key]))
Expand Down Expand Up @@ -445,6 +488,34 @@ def _startMqtt(self):
"""
LOGGER.info('Connecting to MQTT... {}:{}'.format(
self._server, self._port))

self.username = self.id.replace(':', '')

# Load the client SSL certificate. What if this fails?
if self.pg3init['secure'] == 1:
self.sslContext = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
self.sslContext.check_hostname = False
cert = self.username + ".cert"
key = self.username + ".key"

# only if certs exist!
if exists(cert) and exists(key):
LOGGER.info('Using SSL certs: {} {}'.format(cert, key))
self.sslContext.load_cert_chain(cert, key)
self.using_mosquitto = True
else:
self.username = self.id
self.using_mosquitto = False

self._mqttc.tls_set_context(self.sslContext)

self._mqttc.username_pw_set(self.username, self.pg3init['token'])

if self.using_mosquitto:
# Set up the will, do we need this here?
failed = {"disconnected": [{}]}
self._mqttc.will_set('udi/pg3/connections/ns/{}'.format(self.id), json.dumps(failed), qos=0, retain=True)

done = False
while not done:
try:
Expand All @@ -467,22 +538,23 @@ def _startMqtt(self):
def _get_server_data(self):
"""
_get_server_data: Loads the server.json and returns as a dict
:param check_profile: Calls the check_profile method if True

If profile_version in json is null then profile will be loaded on
every restart.

"""
self.serverdata = {'version': 'unknown'}
self.serverdata = {'version': '0.0.0', 'profile_version': 'NotDefined'}

# Read the SERVER info from the json.
try:
with open(Interface.SERVER_JSON_FILE_NAME) as data:
self.serverdata = json.load(data)
data.close()
except Exception as err:
LOGGER.error('get_server_data: failed to read file {0}: {1}'.format(
Interface.SERVER_JSON_FILE_NAME, err), exc_info=True)
"""
Failure to load the server.json file is no longer an error.
The only things that may be used from the server.json file are
the version number (as a fallback if start isn't called with one)
and the profile_version for checkProfile().
"""
LOGGER.warning('get_server_data: failed to read file {0}: {1}'.format(
Interface.SERVER_JSON_FILE_NAME, err), exc_info=False)
return

# Get the version info
Expand Down Expand Up @@ -512,6 +584,8 @@ def stop(self):
LOGGER.info('Disconnecting from MQTT... {}:{}'.format(
self._server, self._port))
self._mqttc.loop_stop()
disconnect = {"disconnected": [{}]}
self._mqttc.publish('udi/pg3/connections/ns/{}'.format(self.id), json.dumps(disconnect), retain=True)
self._mqttc.disconnect()

def send(self, message, type):
Expand Down Expand Up @@ -896,6 +970,8 @@ def start(self, version=None):
# Tell PG3 our version
if version:
self.serverdata['version'] = version
else:
LOGGER.warning('No node server version specified. Using deprecated server.json version')


def ready(self):
Expand All @@ -912,16 +988,12 @@ def isConnected(self):
""" Tells you if this nodeserver and Polyglot are connected via MQTT """
return self.connected

def addNode(self, node, conn_status=None):
def addNode(self, node, conn_status=None, rename=False):
"""
Add a node to the NodeServer

:param node: Dictionary of node settings. Keys: address, name, node_def_id, primary, and drivers are required.
"""
if node.address in self._nodes and self._nodes[node.address]['name'] != node.name:
LOGGER.warning("addNode(): Cannot be used to change the node's name")
node.name = self._nodes[node.address]['name']

LOGGER.info('Adding node {}({}) [{}]'.format(node.name, node.address, node.private))
message = {
'addnode': [{
Expand All @@ -931,7 +1003,8 @@ def addNode(self, node, conn_status=None):
'primaryNode': node.primary,
'drivers': node.drivers,
'hint': node.hint,
'private': node.private
'private': node.private,
'rename': rename
}]
}
self.send(message, 'command')
Expand All @@ -954,7 +1027,7 @@ def getConfig(self):
""" Returns a copy of the last config received. """
return self.config

def db_getNodeDrivers(self, addr = None):
def db_getNodeDrivers(self, addr = None, init = False):
"""
Returns a list of nodes or a list of drivers that were saved in the
database.
Expand Down Expand Up @@ -986,7 +1059,9 @@ def db_getNodeDrivers(self, addr = None):
for n in self._nodes:
if self._nodes[n]['address'] == addr:
return self._nodes[n]['drivers'] # this is an array
LOGGER.warning(f'{addr} not found in database.')
# ignore the warning if we're initialzing the node.
if not init:
LOGGER.warning(f'{addr} not found in database.')
else:
for n in self._nodes:
nl.append(self._nodes[n])
Expand Down Expand Up @@ -1055,17 +1130,23 @@ def getNode(self, address):

def renameNode(self, address, newname):
"""
Rename a node from the Node Server
Rename a node from the Node Server.
Can we do this if the node is not on the internal node list?
"""
LOGGER.info('Renaming node {}'.format(address))
message = {
if address in self.nodes_internal:
LOGGER.info('Renaming node {}'.format(address))
message = {
'renamenode': [{
'address': address,
'name': newname
}]
}
}

self.send(message, 'command')
self.nodes_internal[address].name = newname
else:
LOGGER.error('renameNode: Node {} doesn\'t exist'.format(address))

self.send(message, 'command')

def delNode(self, address):
"""
Expand Down Expand Up @@ -1205,14 +1286,13 @@ def checkProfile(self, force=False, build_profile=None):
LOGGER.debug('check_profile: force={} build_profile={}'.format(
force, build_profile))

""" FIXME: this should be from self._ifaceData """
cdata = self._ifaceData.profile_version

LOGGER.debug('check_profile: saved_version={}'.format(cdata))
LOGGER.debug('check_profile: profile_version={}'.format(
self.serverdata['profile_version']))
if self.serverdata['profile_version'] == "NotDefined":
LOGGER.error(
LOGGER.warning(
'check_profile: Ignoring since nodeserver does not have profile_version')
return False

Expand Down
9 changes: 5 additions & 4 deletions udi_interface/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _convertDrivers(self, drivers):
def updateDrivers(self, drivers):
self.drivers = deepcopy(drivers)

def _updateDrivers(self, poly, address):
def _updateDrivers(self, poly, address, init=True):
db_drivers = poly.db_getNodeDrivers(address)
try:
for drv in db_drivers:
Expand Down Expand Up @@ -144,6 +144,7 @@ def rename(self, newname):
}

self.poly.send(message, 'command')
self.name = newname

def reportCmd(self, command, value=None, uom=None):
message = {
Expand All @@ -164,13 +165,13 @@ def runCmd(self, command):
fun = self.commands[command['cmd']]
fun(self, command)
else:
NLOGGER.error('command {} not defined'.format(command['cmd']))
NLOGGER.error('node {} command {} not defined'.format(self.address,command['cmd']))
elif 'success' in command:
if not command['success']:
NLOGGER.error('Command message failed: {}'.format(command))
NLOGGER.error('Command message failed for node {}: {}'.format(self.address,command))

else:
NLOGGER.error('Invalid command message: {}'.format(command))
NLOGGER.error('Invalid command message for node {}: {}'.format(self.address,command))


def start(self):
Expand Down