#!/usr/bin/env python3
"""A PyQt5 GUI utility to monitor and plot MQTT server messages."""
################################################################
# Written in 2018-2020 by Garth Zeglin <garthz@cmu.edu>
# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.
# You should have received a copy of the CC0 Public Domain Dedication along with this software.
# If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
################################################################
# standard Python libraries
from __future__ import print_function
import os, sys, struct, time, logging, functools, queue, signal, getpass, math
# documentation: https://doc.qt.io/qt-5/index.html
# documentation: https://www.riverbankcomputing.com/static/Docs/PyQt5/index.html
from PyQt5 import QtCore, QtGui, QtWidgets, QtNetwork
# documentation: https://www.eclipse.org/paho/clients/python/docs/
import paho.mqtt.client as mqtt
# default logging output
log = logging.getLogger('main')
# logger to pass to the MQTT library
mqtt_log = logging.getLogger('mqtt')
mqtt_log.setLevel(logging.WARNING)
# IDeATE server instances, as per https://mqtt.ideate.cmu.edu/#ports
ideate_ports = { 8884 : '16-223',
8885 : '16-375',
8886 : '60-223',
8887 : '62-362',
}
mqtt_rc_codes = ['Success', 'Incorrect protocol version', 'Invalid client identifier', 'Server unavailable', 'Bad username or password', 'Not authorized']
################################################################
[docs]
class QtParticles(QtWidgets.QWidget):
"""Custom widget to draw a 2D plot of a set of particles. Each particle has 5-D
state: [x, y, r, g, b]. Each axis is defined to have unit scaling and is
valid on [0,1].
"""
def __init__(self):
super().__init__()
self.setMinimumSize(QtCore.QSize(100, 100))
self.setAutoFillBackground(True)
# Graphical state variables. Define a placeholder point while testing.
# self.particles = {'placeholder' : [0.5, 0.5, 0.5, 0.5, 0.5] }
self.particles = {}
# finish initialization
self.show()
return
# === particle update methods ============================================================
def update_particle_position(self, name, location):
# limit the coordinate range to slightly larger than the plotting bounds
x = min(max(location[0], -0.1), 1.1)
y = min(max(location[1], -0.1), 1.1)
particle = self.particles.get(name)
if particle is None:
self.particles[name] = [x, y, 0.5, 0.5, 0.5]
else:
particle[0:2] = x, y
self.repaint()
def update_particle_color(self, name, rgb):
particle = self.particles.get(name)
if particle is None:
self.particles[name] = [0.0, 0.0, rgb[0], rgb[1], rgb[2]]
else:
particle[2:5] = rgb
self.repaint()
# === Qt API methods ============================================================
[docs]
def paintEvent(self, e):
"""Subclass implementation of parent QWidget class callback to repaint the graphics."""
geometry = self.geometry()
view_width = geometry.width()
view_height = geometry.height()
# Clear the background.
qp = QtGui.QPainter()
qp.begin(self)
qp.fillRect(QtCore.QRectF(0, 0, view_width, view_height), QtCore.Qt.white)
# qp.setRenderHint(QtGui.QPainter.Antialiasing)
# Set up a coordinate system scaled to unit dimension that keeps the
# minimum visible area in view.
scene_width = 1.3 # minimum visible width
scene_height = scene_width # minimum visible height
scene_aspect = scene_width / scene_height
view_aspect = view_width / view_height
if scene_aspect > view_aspect:
scaling = view_width / scene_width
else:
scaling = view_height/scene_height
# Capture the default graphics transformation.
qp.save()
# Move the origin to the center (in pixel coordinates).
qp.translate(QtCore.QPointF(view_width/2, view_height/2))
# Apply scaling to draw in unit coordinates.
qp.scale(scaling, scaling)
# Translate in the new scaled coordinates to place the origin near the
# upper left corner; the default coordinates using +Y pointing down.
qp.translate(QtCore.QPointF(-0.5, -0.5))
# Draw the bounds of the unit square.
pen = QtGui.QPen(QtCore.Qt.black)
pen.setWidthF(0.005)
qp.setPen(pen)
qp.drawRect(QtCore.QRectF(0.0, 0.0, 1.0, 1.0))
# Draw the particles. The Y dimension is inverted to a normal
# mathematical plot with +Y up.
color = QtGui.QColor()
for particle in self.particles.values():
color.setRgbF(min(1.0, abs(particle[2])), min(1.0, abs(particle[3])), min(1.0, abs(particle[4])), 1.0)
brush = QtGui.QBrush(color)
qp.setBrush(brush)
qp.drawEllipse(QtCore.QPointF(particle[0], 1.0 - particle[1]), 0.02, 0.02)
# Restore the initial unscaled coordinates.
qp.restore()
qp.end()
################################################################
[docs]
class MainGUI(QtWidgets.QMainWindow):
"""A custom main window which provides all GUI controls. Requires a delegate main application object to handle user requests."""
def __init__(self, main, *args, **kwargs):
super(MainGUI,self).__init__()
# save the main object for delegating GUI events
self.main = main
# create the GUI elements
self.console_queue = queue.Queue()
self.setupUi()
self._handler = None
self.enable_console_logging()
# finish initialization
self.show()
# manage the console output across threads
self.console_timer = QtCore.QTimer()
self.console_timer.timeout.connect(self._poll_console_queue)
self.console_timer.start(50) # units are milliseconds
return
# ------------------------------------------------------------------------------------------------
def setupUi(self):
self.setWindowTitle("IDeATe MQTT Plotter")
self.resize(600, 600)
# set up tabbed page structure
self.tabs = QtWidgets.QTabWidget()
self.setCentralWidget(self.tabs)
# set up a main tab with the connection controls
self.mainTab = QtWidgets.QWidget(self)
self.tabs.addTab(self.mainTab, 'Main')
self.verticalLayout = QtWidgets.QVBoxLayout(self.mainTab)
self.verticalLayout.setContentsMargins(-1, -1, -1, 9) # left, top, right, bottom
# generate GUI for configuring the MQTT connection
# server name entry and port selection
hbox = QtWidgets.QHBoxLayout()
self.verticalLayout.addLayout(hbox)
hbox.addWidget(QtWidgets.QLabel("MQTT server address:"))
self.mqtt_server_name = QtWidgets.QLineEdit()
self.mqtt_server_name.setText(str(self.main.hostname))
self.mqtt_server_name.editingFinished.connect(self.mqtt_server_name_entered)
hbox.addWidget(self.mqtt_server_name)
hbox.addWidget(QtWidgets.QLabel("port:"))
self.port_selector = QtWidgets.QComboBox()
hbox.addWidget(self.port_selector)
self.port_selector.addItem("")
for pairs in ideate_ports.items():
self.port_selector.addItem("%d (%s)" % pairs)
self.port_selector.activated['QString'].connect(self.mqtt_port_selected)
# attempt to pre-select the stored port number
try:
idx = list(ideate_ports.keys()).index(self.main.portnum)
self.port_selector.setCurrentIndex(idx+1)
except ValueError:
pass
# instructions
explanation = QtWidgets.QLabel("""Username and password provided by instructor. Please see help tab for details.""")
explanation.setWordWrap(True)
self.verticalLayout.addWidget(explanation)
# user and password entry
hbox = QtWidgets.QHBoxLayout()
self.verticalLayout.addLayout(hbox)
hbox.addWidget(QtWidgets.QLabel("MQTT username:"))
self.mqtt_username = QtWidgets.QLineEdit()
self.mqtt_username.setText(str(self.main.username))
self.mqtt_username.editingFinished.connect(self.mqtt_username_entered)
hbox.addWidget(self.mqtt_username)
hbox.addWidget(QtWidgets.QLabel("password:"))
self.mqtt_password = QtWidgets.QLineEdit()
self.mqtt_password.setText(str(self.main.password))
self.mqtt_password.editingFinished.connect(self.mqtt_password_entered)
hbox.addWidget(self.mqtt_password)
# instructions
explanation = QtWidgets.QLabel("""A subscription specifies topics to receive. Please see help tab for details.""")
explanation.setWordWrap(True)
self.verticalLayout.addWidget(explanation)
# subscription topic entry
hbox = QtWidgets.QHBoxLayout()
label = QtWidgets.QLabel("MQTT message subscription:")
self.mqtt_sub = QtWidgets.QLineEdit()
self.mqtt_sub.setText(self.main.subscription)
self.mqtt_sub.editingFinished.connect(self.mqtt_sub_entered)
hbox.addWidget(label)
hbox.addWidget(self.mqtt_sub)
self.verticalLayout.addLayout(hbox)
# connection indicator
self.connected = QtWidgets.QLabel()
self.connected.setLineWidth(3)
self.connected.setFrameStyle(QtWidgets.QFrame.Box)
self.connected.setAlignment(QtCore.Qt.AlignCenter)
sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed)
self.connected.setSizePolicy(sizePolicy)
self.set_connected_state(False)
# connection control buttons
connect = QtWidgets.QPushButton('Connect')
connect.pressed.connect(self.connection_requested)
disconnect = QtWidgets.QPushButton('Disconnect')
disconnect.pressed.connect(self.main.disconnect_from_mqtt_server)
hbox = QtWidgets.QHBoxLayout()
hbox.addWidget(self.connected)
hbox.addWidget(connect)
hbox.addWidget(disconnect)
self.verticalLayout.addLayout(hbox)
# text area for displaying both internal and received messages
self.consoleOutput = QtWidgets.QPlainTextEdit()
self.consoleOutput.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAsNeeded)
self.verticalLayout.addWidget(self.consoleOutput)
# set up the graphics tab
self.plot = QtParticles()
self.tabs.addTab(self.plot, 'Plot')
# set up the help tab
self.helpTab = QtWidgets.QWidget(self)
self.tabs.addTab(self.helpTab, 'Help')
self._make_help(self.helpTab)
# set up the status bar which appears at the bottom of the window
self.statusbar = QtWidgets.QStatusBar(self)
self.setStatusBar(self.statusbar)
# set up the main menu
self.menubar = QtWidgets.QMenuBar(self)
self.menubar.setGeometry(QtCore.QRect(0, 0, 500, 22))
self.menubar.setNativeMenuBar(False)
self.menubar.setObjectName("menubar")
self.menuTitle = QtWidgets.QMenu(self.menubar)
self.setMenuBar(self.menubar)
self.actionQuit = QtWidgets.QAction(self)
self.menuTitle.addAction(self.actionQuit)
self.menubar.addAction(self.menuTitle.menuAction())
self.menuTitle.setTitle("File")
self.actionQuit.setText("Quit")
self.actionQuit.setShortcut("Ctrl+Q")
self.actionQuit.triggered.connect(self.quitSelected)
return
# --- verbose function to create the help tab -----------------------------------------
def _make_help(self, parent):
vbox = QtWidgets.QVBoxLayout(parent)
hbox = QtWidgets.QHBoxLayout()
vbox.addLayout(hbox)
text = QtWidgets.QTextEdit()
hbox.addWidget(text)
text.insertHtml("""
<style type="text/css">
table { margin-left: 20px; }
td { padding-left: 20px; }
</style>
<a href="#top"></a>
<h1>IDeATe MQTT Plotter</h1>
<p>This Python application is a tool intended for visualizing the content of multiple data streams passed as short numeric messages back and forth across the network via a MQTT server. It supports opening an authenticated connection to the server, subscribing to a class of messages in order to receive them, viewing message traffic, and plotting the received messages as points in a dynamically updated graphic.</p>
<h2>Connecting</h2>
<p>The first set of controls configures server parameters before attempting a connection. Changes will not take effect until the next connection attempt.</p
<dl>
<dt>server address</dt><dd>The network name of the MQTT server. (Defaults to mqtt.ideate.cmu.edu.)</dd>
<dt>server port</dt><dd>The numeric port number for the MQTT server. IDeATe is using a separate server for each course, so the drop-down menu also identifies the associated course number.</dd>
<dt>username</dt><dd>Server-specific identity, chosen by your instructor.</dd>
<dt>password</dt><dd>Server-specific password, chosen by your instructor.</dd>
</dl>
<p>Your username and password is specific to the MQTT server and will be provided by your instructor. This may be individual or may be a shared login for all students in the course. Please note, the password will not be your Andrew password.</p>
<h2>Listening</h2>
<p>MQTT works on a publish/subscribe model in which messages are published on <i>topics</i> identified by a topic name. The name is structured like a path string separated by <tt>/</tt> characters to organize messages into a hierarchy of topics and subtopics.
Our course policy will be to prefix all topics with a student andrew ID, e.g. if your user name is xyzzy, we ask that you publish on the 'xyzzy' topic and sub-topics, as per the following examples.</p>
<p>
<table>
<tr><td><b>xyzzy</b></td><td>top-level topic on which user 'xyzzy' should publish</td></tr>
<tr><td><b>xyzzy/status</b></td><td>a sub-topic on which user 'xyzzy' could publish</td></tr>
<tr><td><b>xyzzy/sensor</b></td><td>another sub-topic on which user 'xyzzy' could publish</td></tr>
<tr><td><b>xyzzy/sensor/1</b></td><td>a possible sub-sub-topic</td></tr>
</table>
</p>
<p>The message subscription field specifies topics to receive. The subscription may include a # character as a wildcard, as per the following examples.</p>
<p><table>
<tr><td><b>#</b></td><td>subscribe to all messages (typical for this application)</td></tr>
<tr><td><b>xyzzy</b></td><td>subscribe to the top-level published messages for user xyzzy</td></tr>
<tr><td><b>xyzzy/#</b></td><td>subscribe to all published messages for user xyzzy, including subtopics</td></tr>
</table>
</p>
<p>Changing the subscription field immediately changes what is received; the monitor unsubscribes from the previous pattern and subscribes to the new one. Entering an empty field defaults to the global pattern '#'.</p>
<p>The large text field is the console area which shows both debugging and status log messages as well as received messages.</p>
<h2>Data Format</h2>
<p>Each message received is processed as plain text integer numbers separated by spaces. Either two or five value messages are supported: with two values, they are interpreted as the X and Y location ("X Y"), with five values as a position and an RGB color ("X Y R G B"). All values may range from 0 to 100 inclusive. The topic name is used to identify the point, so multiple messages on the same topic will dynamically move a plot point. Some sample messages follow.</p>
<p><table>
<tr><td><b>0 0</b></td><td>move the point to the lower left corner</td></tr>
<tr><td><b>50 50</b></td><td>move the point to the center</td></tr>
<tr><td><b>100 50 100 0 0</b></td><td>move to right edge and paint it red</td></tr>
<tr><td><b>75 75 0 0 100</b></td><td>move near upper right and paint it blue</td></tr>
</table>
</p>
<h2>More Information</h2>
<p>The IDeATE server has more detailed information on the server help page at <b>https://mqtt.ideate.cmu.edu</b></p>
""")
text.scrollToAnchor("top")
text.setReadOnly(True)
return
# --- logging to screen -------------------------------------------------------------
def enable_console_logging(self):
# get the root logger to receive all logging traffic
logger = logging.getLogger()
# create a logging handler which writes to the console window via self.write
handler = logging.StreamHandler(self)
handler.setFormatter(logging.Formatter('%(levelname)s:%(name)s: %(message)s'))
logger.addHandler(handler)
# logger.setLevel(logging.NOTSET)
logger.setLevel(logging.DEBUG)
# logger.setLevel(logging.WARNING)
handler.setLevel(logging.NOTSET)
self._handler = handler
log.info("Enabled logging in console window.")
return
def disable_console_logging(self):
if self._handler is not None:
logging.getLogger().removeHandler(self._handler)
self._handler = None
# --- window and qt event processing -------------------------------------------------------------
def show_status(self, string):
self.statusbar.showMessage(string)
def _poll_console_queue(self):
"""Write any queued console text to the console text area from the main thread."""
while not self.console_queue.empty():
string = str(self.console_queue.get())
stripped = string.rstrip()
if stripped != "":
self.consoleOutput.appendPlainText(stripped)
return
[docs]
def write(self, string):
"""Write output to the console text area in a thread-safe way. Qt only allows
calls from the main thread, but the service routines run on separate threads."""
self.console_queue.put(string)
return
def quitSelected(self):
self.write("User selected quit.")
self.close()
def closeEvent(self, event):
self.write("Received window close event.")
self.main.app_is_exiting()
self.disable_console_logging()
super(MainGUI,self).closeEvent(event)
def set_connected_state(self, flag):
if flag is True:
self.connected.setText(" Connected ")
self.connected.setStyleSheet("color: white; background-color: green;")
else:
self.connected.setText(" Not Connected ")
self.connected.setStyleSheet("color: white; background-color: blue;")
# --- GUI widget event processing ----------------------------------------------------------------------
def mqtt_server_name_entered(self):
name = self.mqtt_server_name.text()
self.write("Server name changed: %s" % name)
self.main.set_server_name(name)
def decode_port_selection(self):
title = self.port_selector.currentText()
if title == "":
return None
else:
return int(title.split()[0]) # convert the first token to a number
def mqtt_port_selected(self, title):
portnum = self.decode_port_selection()
self.write("Port selection changed: %s" % title)
self.main.set_server_port(portnum)
def mqtt_username_entered(self):
name = self.mqtt_username.text()
self.write("User name changed: %s" % name)
self.main.set_username(name)
def mqtt_password_entered(self):
name = self.mqtt_password.text()
self.write("Password changed: %s" % name)
self.main.set_password(name)
def connection_requested(self):
# When the connect button is pressed, make sure all fields are up to
# date. It is otherwise possible to leave a text field selected with
# unreceived changes while pressing Connect.
hostname = self.mqtt_server_name.text()
portnum = self.decode_port_selection()
username = self.mqtt_username.text()
password = self.mqtt_password.text()
self.main.set_server_name(hostname)
self.main.set_server_port(portnum)
self.main.set_username(username)
self.main.set_password(password)
self.main.connect_to_mqtt_server()
def mqtt_sub_entered(self):
sub = self.mqtt_sub.text()
if sub == '':
self.mqtt_sub.setText("#")
sub = "#"
self.write("Subscription changed to: %s" % sub)
self.main.set_subscription(sub)
################################################################
[docs]
class MainApp(QtCore.QObject):
"""Main application object holding any non-GUI related state."""
# class variable with Qt signal used to communicate between network thread and main thread
_messageReceived = QtCore.pyqtSignal(str, bytes, name='_messageReceived')
def __init__(self):
super(MainApp,self).__init__()
# Attach a handler to the keyboard interrupt (control-C).
signal.signal(signal.SIGINT, self._sigint_handler)
# load any available persistent application settings
QtCore.QCoreApplication.setOrganizationName("IDeATe")
QtCore.QCoreApplication.setOrganizationDomain("ideate.cmu.edu")
QtCore.QCoreApplication.setApplicationName('mqtt_plotter')
self.settings = QtCore.QSettings()
# uncomment to restore 'factory defaults'
# self.settings.clear()
# MQTT server settings
self.hostname = self.settings.value('mqtt_host', 'mqtt.ideate.cmu.edu')
self.portnum = self.settings.value('mqtt_port', None)
self.username = self.settings.value('mqtt_user', 'students')
self.password = self.settings.value('mqtt_password', '(not yet entered)')
# Create a default subscription based on the username. The hash mark is a wildcard.
username = getpass.getuser()
# self.subscription = self.settings.value('mqtt_subscription', username + '/#')
self.subscription = self.settings.value('mqtt_subscription', '#')
# create the interface window
self.window = MainGUI(self)
# Initialize the MQTT client system
self.client = mqtt.Client()
self.client.enable_logger(mqtt_log)
self.client.on_log = self.on_log
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
self.client.tls_set()
# Connect the signal used to transfer received messages from the network server thread to the main thread.
self._messageReceived.connect(self.process_message)
self.window.show_status("Please set the MQTT server address and select Connect.")
return
################################################################
def app_is_exiting(self):
if self.client.is_connected():
self.client.disconnect()
self.client.loop_stop()
def _sigint_handler(self, signal, frame):
print("Keyboard interrupt caught, running close handlers...")
self.app_is_exiting()
sys.exit(0)
################################################################
def set_server_name(self, name):
self.hostname = name
self.settings.setValue('mqtt_host', name)
def set_server_port(self, value):
self.portnum = value
self.settings.setValue('mqtt_port', self.portnum)
def set_username(self, name):
self.username = name
self.settings.setValue('mqtt_user', name)
def set_password(self, name):
self.password = name
self.settings.setValue('mqtt_password', name)
def connect_to_mqtt_server(self):
if self.client.is_connected():
self.window.write("Already connected.")
else:
if self.portnum is None:
log.warning("Please specify the server port before attempting connection.")
else:
log.debug("Initiating MQTT connection to %s:%d" % (self.hostname, self.portnum))
self.window.write("Attempting connection.")
self.client.username_pw_set(self.username, self.password)
self.client.connect_async(self.hostname, self.portnum)
self.client.loop_start()
def disconnect_from_mqtt_server(self):
if self.client.is_connected():
self.client.disconnect()
else:
self.window.write("Not connected.")
self.client.loop_stop()
################################################################
# The callback for when the broker responds to our connection request.
def on_connect(self, client, userdata, flags, rc):
self.window.write("Connected to server with with flags: %s, result code: %s" % (flags, rc))
if rc == 0:
log.info("Connection succeeded.")
elif rc > 0:
if rc < len(mqtt_rc_codes):
log.warning("Connection failed with error: %s", mqtt_rc_codes[rc])
else:
log.warning("Connection failed with unknown error %d", rc)
# Subscribing in on_connect() means that if we lose the connection and reconnect then subscriptions will be renewed.
client.subscribe(self.subscription)
self.window.show_status("Connected.")
self.window.set_connected_state(True)
return
# The callback for when the broker responds with error messages.
def on_log(client, userdata, level, buf):
log.debug("on_log level %s: %s", level, userdata)
return
def on_disconnect(self, client, userdata, rc):
log.debug("disconnected")
self.window.write("Disconnected from server.")
self.window.show_status("Disconnected.")
self.window.set_connected_state(False)
# The callback for when a message has been received on a topic to which this
# client is subscribed. The message variable is a MQTTMessage that describes
# all of the message parameters.
# Some useful MQTTMessage fields: topic, payload, qos, retain, mid, properties.
# The payload is a binary string (bytes).
# qos is an integer quality of service indicator (0,1, or 2)
# mid is an integer message ID.
# N.B. this function is called from the network server thread, but the actual
# processing needs to happen on the main thread for graphic output.
def on_message(self, client, userdata, msg):
self.window.write("{%s} %s" % (msg.topic, msg.payload))
self._messageReceived.emit(msg.topic, msg.payload)
@QtCore.pyqtSlot(str, bytes)
def process_message(self, topic, payload):
# Update the particle plotter. The name of the particle is the topic
# name, and the data updated depends on the message format.
name = topic
# Parse the message text by attempting to convert all tokens to integers. The values
# are scaled so that the default range is 0 to 100.
tokens = payload.split()
try:
values = [(0.01 * int(x)) for x in tokens]
if len(values) > 1:
self.window.plot.update_particle_position(name, values[0:2])
if len(values) > 4:
self.window.plot.update_particle_color(name, values[2:5])
except ValueError:
log.warning("Error parsing message into numbers: %s %s", topic, payload)
return
################################################################
def set_subscription(self, sub):
if self.client.is_connected():
self.client.unsubscribe(self.subscription)
try:
self.client.subscribe(sub)
self.subscription = sub
self.settings.setValue('mqtt_subscription', sub)
except ValueError:
self.window.write("Invalid subscription string, not changed.")
self.client.subscribe(self.subscription)
else:
self.subscription = sub
self.settings.setValue('mqtt_subscription', sub)
################################################################
def main():
# Optionally add an additional root log handler to stream messages to the terminal console.
if False:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(logging.Formatter('%(levelname)s:%(name)s: %(message)s'))
logging.getLogger().addHandler(console_handler)
# initialize the Qt system itself
app = QtWidgets.QApplication(sys.argv)
# create the main application controller
main = MainApp()
# run the event loop until the user is done
log.info("Starting event loop.")
sys.exit(app.exec_())
################################################################
# Main script follows. This sequence is executed when the script is initiated from the command line.
if __name__ == "__main__":
main()