Source code for mqtt.qt_mqtt_plotter

#!/usr/bin/env python3
"""A PyQt5 GUI utility to monitor and plot MQTT server messages."""

################################################################
# Written in 2018-2020 by Garth Zeglin <garthz@cmu.edu>

# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.

# You should have received a copy of the CC0 Public Domain Dedication along with this software.
# If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.

################################################################
# standard Python libraries
from __future__ import print_function
import os, sys, struct, time, logging, functools, queue, signal, getpass, math

# documentation: https://doc.qt.io/qt-5/index.html
# documentation: https://www.riverbankcomputing.com/static/Docs/PyQt5/index.html
from PyQt5 import QtCore, QtGui, QtWidgets, QtNetwork

# documentation: https://www.eclipse.org/paho/clients/python/docs/
import paho.mqtt.client as mqtt

# default logging output
log = logging.getLogger('main')

# logger to pass to the MQTT library
mqtt_log = logging.getLogger('mqtt')
mqtt_log.setLevel(logging.WARNING)

# IDeATE server instances, as per https://mqtt.ideate.cmu.edu/#ports

ideate_ports = { 8884 : '16-223',
                 8885 : '16-375',
                 8886 : '60-223',
                 8887 : '62-362',
}

mqtt_rc_codes = ['Success', 'Incorrect protocol version', 'Invalid client identifier', 'Server unavailable', 'Bad username or password', 'Not authorized']

################################################################
[docs]class QtParticles(QtWidgets.QWidget): """Custom widget to draw a 2D plot of a set of particles. Each particle has 5-D state: [x, y, r, g, b]. Each axis is defined to have unit scaling and is valid on [0,1]. """ def __init__(self): super().__init__() self.setMinimumSize(QtCore.QSize(100, 100)) self.setAutoFillBackground(True) # Graphical state variables. Define a placeholder point while testing. # self.particles = {'placeholder' : [0.5, 0.5, 0.5, 0.5, 0.5] } self.particles = {} # finish initialization self.show() return # === particle update methods ============================================================ def update_particle_position(self, name, location): # limit the coordinate range to slightly larger than the plotting bounds x = min(max(location[0], -0.1), 1.1) y = min(max(location[1], -0.1), 1.1) particle = self.particles.get(name) if particle is None: self.particles[name] = [x, y, 0.5, 0.5, 0.5] else: particle[0:2] = x, y self.repaint() def update_particle_color(self, name, rgb): particle = self.particles.get(name) if particle is None: self.particles[name] = [0.0, 0.0, rgb[0], rgb[1], rgb[2]] else: particle[2:5] = rgb self.repaint() # === Qt API methods ============================================================
[docs] def paintEvent(self, e): """Subclass implementation of parent QWidget class callback to repaint the graphics.""" geometry = self.geometry() view_width = geometry.width() view_height = geometry.height() # Clear the background. qp = QtGui.QPainter() qp.begin(self) qp.fillRect(QtCore.QRectF(0, 0, view_width, view_height), QtCore.Qt.white) # qp.setRenderHint(QtGui.QPainter.Antialiasing) # Set up a coordinate system scaled to unit dimension that keeps the # minimum visible area in view. scene_width = 1.3 # minimum visible width scene_height = scene_width # minimum visible height scene_aspect = scene_width / scene_height view_aspect = view_width / view_height if scene_aspect > view_aspect: scaling = view_width / scene_width else: scaling = view_height/scene_height # Capture the default graphics transformation. qp.save() # Move the origin to the center (in pixel coordinates). qp.translate(QtCore.QPointF(view_width/2, view_height/2)) # Apply scaling to draw in unit coordinates. qp.scale(scaling, scaling) # Translate in the new scaled coordinates to place the origin near the # upper left corner; the default coordinates using +Y pointing down. qp.translate(QtCore.QPointF(-0.5, -0.5)) # Draw the bounds of the unit square. pen = QtGui.QPen(QtCore.Qt.black) pen.setWidthF(0.005) qp.setPen(pen) qp.drawRect(QtCore.QRectF(0.0, 0.0, 1.0, 1.0)) # Draw the particles. The Y dimension is inverted to a normal # mathematical plot with +Y up. color = QtGui.QColor() for particle in self.particles.values(): color.setRgbF(min(1.0, abs(particle[2])), min(1.0, abs(particle[3])), min(1.0, abs(particle[4])), 1.0) brush = QtGui.QBrush(color) qp.setBrush(brush) qp.drawEllipse(QtCore.QPointF(particle[0], 1.0 - particle[1]), 0.02, 0.02) # Restore the initial unscaled coordinates. qp.restore() qp.end()
################################################################
[docs]class MainGUI(QtWidgets.QMainWindow): """A custom main window which provides all GUI controls. Requires a delegate main application object to handle user requests.""" def __init__(self, main, *args, **kwargs): super(MainGUI,self).__init__() # save the main object for delegating GUI events self.main = main # create the GUI elements self.console_queue = queue.Queue() self.setupUi() self._handler = None self.enable_console_logging() # finish initialization self.show() # manage the console output across threads self.console_timer = QtCore.QTimer() self.console_timer.timeout.connect(self._poll_console_queue) self.console_timer.start(50) # units are milliseconds return # ------------------------------------------------------------------------------------------------ def setupUi(self): self.setWindowTitle("IDeATe MQTT Plotter") self.resize(600, 600) # set up tabbed page structure self.tabs = QtWidgets.QTabWidget() self.setCentralWidget(self.tabs) # set up a main tab with the connection controls self.mainTab = QtWidgets.QWidget(self) self.tabs.addTab(self.mainTab, 'Main') self.verticalLayout = QtWidgets.QVBoxLayout(self.mainTab) self.verticalLayout.setContentsMargins(-1, -1, -1, 9) # left, top, right, bottom # generate GUI for configuring the MQTT connection # server name entry and port selection hbox = QtWidgets.QHBoxLayout() self.verticalLayout.addLayout(hbox) hbox.addWidget(QtWidgets.QLabel("MQTT server address:")) self.mqtt_server_name = QtWidgets.QLineEdit() self.mqtt_server_name.setText(str(self.main.hostname)) self.mqtt_server_name.editingFinished.connect(self.mqtt_server_name_entered) hbox.addWidget(self.mqtt_server_name) hbox.addWidget(QtWidgets.QLabel("port:")) self.port_selector = QtWidgets.QComboBox() hbox.addWidget(self.port_selector) self.port_selector.addItem("") for pairs in ideate_ports.items(): self.port_selector.addItem("%d (%s)" % pairs) self.port_selector.activated['QString'].connect(self.mqtt_port_selected) # attempt to pre-select the stored port number try: idx = list(ideate_ports.keys()).index(self.main.portnum) self.port_selector.setCurrentIndex(idx+1) except ValueError: pass # instructions explanation = QtWidgets.QLabel("""Username and password provided by instructor. Please see help tab for details.""") explanation.setWordWrap(True) self.verticalLayout.addWidget(explanation) # user and password entry hbox = QtWidgets.QHBoxLayout() self.verticalLayout.addLayout(hbox) hbox.addWidget(QtWidgets.QLabel("MQTT username:")) self.mqtt_username = QtWidgets.QLineEdit() self.mqtt_username.setText(str(self.main.username)) self.mqtt_username.editingFinished.connect(self.mqtt_username_entered) hbox.addWidget(self.mqtt_username) hbox.addWidget(QtWidgets.QLabel("password:")) self.mqtt_password = QtWidgets.QLineEdit() self.mqtt_password.setText(str(self.main.password)) self.mqtt_password.editingFinished.connect(self.mqtt_password_entered) hbox.addWidget(self.mqtt_password) # instructions explanation = QtWidgets.QLabel("""A subscription specifies topics to receive. Please see help tab for details.""") explanation.setWordWrap(True) self.verticalLayout.addWidget(explanation) # subscription topic entry hbox = QtWidgets.QHBoxLayout() label = QtWidgets.QLabel("MQTT message subscription:") self.mqtt_sub = QtWidgets.QLineEdit() self.mqtt_sub.setText(self.main.subscription) self.mqtt_sub.editingFinished.connect(self.mqtt_sub_entered) hbox.addWidget(label) hbox.addWidget(self.mqtt_sub) self.verticalLayout.addLayout(hbox) # connection indicator self.connected = QtWidgets.QLabel() self.connected.setLineWidth(3) self.connected.setFrameStyle(QtWidgets.QFrame.Box) self.connected.setAlignment(QtCore.Qt.AlignCenter) sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed) self.connected.setSizePolicy(sizePolicy) self.set_connected_state(False) # connection control buttons connect = QtWidgets.QPushButton('Connect') connect.pressed.connect(self.connection_requested) disconnect = QtWidgets.QPushButton('Disconnect') disconnect.pressed.connect(self.main.disconnect_from_mqtt_server) hbox = QtWidgets.QHBoxLayout() hbox.addWidget(self.connected) hbox.addWidget(connect) hbox.addWidget(disconnect) self.verticalLayout.addLayout(hbox) # text area for displaying both internal and received messages self.consoleOutput = QtWidgets.QPlainTextEdit() self.consoleOutput.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAsNeeded) self.verticalLayout.addWidget(self.consoleOutput) # set up the graphics tab self.plot = QtParticles() self.tabs.addTab(self.plot, 'Plot') # set up the help tab self.helpTab = QtWidgets.QWidget(self) self.tabs.addTab(self.helpTab, 'Help') self._make_help(self.helpTab) # set up the status bar which appears at the bottom of the window self.statusbar = QtWidgets.QStatusBar(self) self.setStatusBar(self.statusbar) # set up the main menu self.menubar = QtWidgets.QMenuBar(self) self.menubar.setGeometry(QtCore.QRect(0, 0, 500, 22)) self.menubar.setNativeMenuBar(False) self.menubar.setObjectName("menubar") self.menuTitle = QtWidgets.QMenu(self.menubar) self.setMenuBar(self.menubar) self.actionQuit = QtWidgets.QAction(self) self.menuTitle.addAction(self.actionQuit) self.menubar.addAction(self.menuTitle.menuAction()) self.menuTitle.setTitle("File") self.actionQuit.setText("Quit") self.actionQuit.setShortcut("Ctrl+Q") self.actionQuit.triggered.connect(self.quitSelected) return # --- verbose function to create the help tab ----------------------------------------- def _make_help(self, parent): vbox = QtWidgets.QVBoxLayout(parent) hbox = QtWidgets.QHBoxLayout() vbox.addLayout(hbox) text = QtWidgets.QTextEdit() hbox.addWidget(text) text.insertHtml(""" <style type="text/css"> table { margin-left: 20px; } td { padding-left: 20px; } </style> <a href="#top"></a> <h1>IDeATe MQTT Plotter</h1> <p>This Python application is a tool intended for visualizing the content of multiple data streams passed as short numeric messages back and forth across the network via a MQTT server. It supports opening an authenticated connection to the server, subscribing to a class of messages in order to receive them, viewing message traffic, and plotting the received messages as points in a dynamically updated graphic.</p> <h2>Connecting</h2> <p>The first set of controls configures server parameters before attempting a connection. Changes will not take effect until the next connection attempt.</p <dl> <dt>server address</dt><dd>The network name of the MQTT server. (Defaults to mqtt.ideate.cmu.edu.)</dd> <dt>server port</dt><dd>The numeric port number for the MQTT server. IDeATe is using a separate server for each course, so the drop-down menu also identifies the associated course number.</dd> <dt>username</dt><dd>Server-specific identity, chosen by your instructor.</dd> <dt>password</dt><dd>Server-specific password, chosen by your instructor.</dd> </dl> <p>Your username and password is specific to the MQTT server and will be provided by your instructor. This may be individual or may be a shared login for all students in the course. Please note, the password will not be your Andrew password.</p> <h2>Listening</h2> <p>MQTT works on a publish/subscribe model in which messages are published on <i>topics</i> identified by a topic name. The name is structured like a path string separated by <tt>/</tt> characters to organize messages into a hierarchy of topics and subtopics. Our course policy will be to prefix all topics with a student andrew ID, e.g. if your user name is xyzzy, we ask that you publish on the 'xyzzy' topic and sub-topics, as per the following examples.</p> <p> <table> <tr><td><b>xyzzy</b></td><td>top-level topic on which user 'xyzzy' should publish</td></tr> <tr><td><b>xyzzy/status</b></td><td>a sub-topic on which user 'xyzzy' could publish</td></tr> <tr><td><b>xyzzy/sensor</b></td><td>another sub-topic on which user 'xyzzy' could publish</td></tr> <tr><td><b>xyzzy/sensor/1</b></td><td>a possible sub-sub-topic</td></tr> </table> </p> <p>The message subscription field specifies topics to receive. The subscription may include a # character as a wildcard, as per the following examples.</p> <p><table> <tr><td><b>#</b></td><td>subscribe to all messages (typical for this application)</td></tr> <tr><td><b>xyzzy</b></td><td>subscribe to the top-level published messages for user xyzzy</td></tr> <tr><td><b>xyzzy/#</b></td><td>subscribe to all published messages for user xyzzy, including subtopics</td></tr> </table> </p> <p>Changing the subscription field immediately changes what is received; the monitor unsubscribes from the previous pattern and subscribes to the new one. Entering an empty field defaults to the global pattern '#'.</p> <p>The large text field is the console area which shows both debugging and status log messages as well as received messages.</p> <h2>Data Format</h2> <p>Each message received is processed as plain text integer numbers separated by spaces. Either two or five value messages are supported: with two values, they are interpreted as the X and Y location ("X Y"), with five values as a position and an RGB color ("X Y R G B"). All values may range from 0 to 100 inclusive. The topic name is used to identify the point, so multiple messages on the same topic will dynamically move a plot point. Some sample messages follow.</p> <p><table> <tr><td><b>0 0</b></td><td>move the point to the lower left corner</td></tr> <tr><td><b>50 50</b></td><td>move the point to the center</td></tr> <tr><td><b>100 50 100 0 0</b></td><td>move to right edge and paint it red</td></tr> <tr><td><b>75 75 0 0 100</b></td><td>move near upper right and paint it blue</td></tr> </table> </p> <h2>More Information</h2> <p>The IDeATE server has more detailed information on the server help page at <b>https://mqtt.ideate.cmu.edu</b></p> """) text.scrollToAnchor("top") text.setReadOnly(True) return # --- logging to screen ------------------------------------------------------------- def enable_console_logging(self): # get the root logger to receive all logging traffic logger = logging.getLogger() # create a logging handler which writes to the console window via self.write handler = logging.StreamHandler(self) handler.setFormatter(logging.Formatter('%(levelname)s:%(name)s: %(message)s')) logger.addHandler(handler) # logger.setLevel(logging.NOTSET) logger.setLevel(logging.DEBUG) # logger.setLevel(logging.WARNING) handler.setLevel(logging.NOTSET) self._handler = handler log.info("Enabled logging in console window.") return def disable_console_logging(self): if self._handler is not None: logging.getLogger().removeHandler(self._handler) self._handler = None # --- window and qt event processing ------------------------------------------------------------- def show_status(self, string): self.statusbar.showMessage(string) def _poll_console_queue(self): """Write any queued console text to the console text area from the main thread.""" while not self.console_queue.empty(): string = str(self.console_queue.get()) stripped = string.rstrip() if stripped != "": self.consoleOutput.appendPlainText(stripped) return
[docs] def write(self, string): """Write output to the console text area in a thread-safe way. Qt only allows calls from the main thread, but the service routines run on separate threads.""" self.console_queue.put(string) return
def quitSelected(self): self.write("User selected quit.") self.close()
[docs] def closeEvent(self, event): self.write("Received window close event.") self.main.app_is_exiting() self.disable_console_logging() super(MainGUI,self).closeEvent(event)
def set_connected_state(self, flag): if flag is True: self.connected.setText(" Connected ") self.connected.setStyleSheet("color: white; background-color: green;") else: self.connected.setText(" Not Connected ") self.connected.setStyleSheet("color: white; background-color: blue;") # --- GUI widget event processing ---------------------------------------------------------------------- def mqtt_server_name_entered(self): name = self.mqtt_server_name.text() self.write("Server name changed: %s" % name) self.main.set_server_name(name) def decode_port_selection(self): title = self.port_selector.currentText() if title == "": return None else: return int(title.split()[0]) # convert the first token to a number def mqtt_port_selected(self, title): portnum = self.decode_port_selection() self.write("Port selection changed: %s" % title) self.main.set_server_port(portnum) def mqtt_username_entered(self): name = self.mqtt_username.text() self.write("User name changed: %s" % name) self.main.set_username(name) def mqtt_password_entered(self): name = self.mqtt_password.text() self.write("Password changed: %s" % name) self.main.set_password(name) def connection_requested(self): # When the connect button is pressed, make sure all fields are up to # date. It is otherwise possible to leave a text field selected with # unreceived changes while pressing Connect. hostname = self.mqtt_server_name.text() portnum = self.decode_port_selection() username = self.mqtt_username.text() password = self.mqtt_password.text() self.main.set_server_name(hostname) self.main.set_server_port(portnum) self.main.set_username(username) self.main.set_password(password) self.main.connect_to_mqtt_server() def mqtt_sub_entered(self): sub = self.mqtt_sub.text() if sub == '': self.mqtt_sub.setText("#") sub = "#" self.write("Subscription changed to: %s" % sub) self.main.set_subscription(sub)
################################################################
[docs]class MainApp(QtCore.QObject): """Main application object holding any non-GUI related state.""" # class variable with Qt signal used to communicate between network thread and main thread _messageReceived = QtCore.pyqtSignal(str, bytes, name='_messageReceived') def __init__(self): super(MainApp,self).__init__() # Attach a handler to the keyboard interrupt (control-C). signal.signal(signal.SIGINT, self._sigint_handler) # load any available persistent application settings QtCore.QCoreApplication.setOrganizationName("IDeATe") QtCore.QCoreApplication.setOrganizationDomain("ideate.cmu.edu") QtCore.QCoreApplication.setApplicationName('mqtt_plotter') self.settings = QtCore.QSettings() # uncomment to restore 'factory defaults' # self.settings.clear() # MQTT server settings self.hostname = self.settings.value('mqtt_host', 'mqtt.ideate.cmu.edu') self.portnum = self.settings.value('mqtt_port', None) self.username = self.settings.value('mqtt_user', 'students') self.password = self.settings.value('mqtt_password', '(not yet entered)') # Create a default subscription based on the username. The hash mark is a wildcard. username = getpass.getuser() # self.subscription = self.settings.value('mqtt_subscription', username + '/#') self.subscription = self.settings.value('mqtt_subscription', '#') # create the interface window self.window = MainGUI(self) # Initialize the MQTT client system self.client = mqtt.Client() self.client.enable_logger(mqtt_log) self.client.on_log = self.on_log self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect self.client.on_message = self.on_message self.client.tls_set() # Connect the signal used to transfer received messages from the network server thread to the main thread. self._messageReceived.connect(self.process_message) self.window.show_status("Please set the MQTT server address and select Connect.") return ################################################################ def app_is_exiting(self): if self.client.is_connected(): self.client.disconnect() self.client.loop_stop() def _sigint_handler(self, signal, frame): print("Keyboard interrupt caught, running close handlers...") self.app_is_exiting() sys.exit(0) ################################################################ def set_server_name(self, name): self.hostname = name self.settings.setValue('mqtt_host', name) def set_server_port(self, value): self.portnum = value self.settings.setValue('mqtt_port', self.portnum) def set_username(self, name): self.username = name self.settings.setValue('mqtt_user', name) def set_password(self, name): self.password = name self.settings.setValue('mqtt_password', name) def connect_to_mqtt_server(self): if self.client.is_connected(): self.window.write("Already connected.") else: if self.portnum is None: log.warning("Please specify the server port before attempting connection.") else: log.debug("Initiating MQTT connection to %s:%d" % (self.hostname, self.portnum)) self.window.write("Attempting connection.") self.client.username_pw_set(self.username, self.password) self.client.connect_async(self.hostname, self.portnum) self.client.loop_start() def disconnect_from_mqtt_server(self): if self.client.is_connected(): self.client.disconnect() else: self.window.write("Not connected.") self.client.loop_stop() ################################################################ # The callback for when the broker responds to our connection request. def on_connect(self, client, userdata, flags, rc): self.window.write("Connected to server with with flags: %s, result code: %s" % (flags, rc)) if rc == 0: log.info("Connection succeeded.") elif rc > 0: if rc < len(mqtt_rc_codes): log.warning("Connection failed with error: %s", mqtt_rc_codes[rc]) else: log.warning("Connection failed with unknown error %d", rc) # Subscribing in on_connect() means that if we lose the connection and reconnect then subscriptions will be renewed. client.subscribe(self.subscription) self.window.show_status("Connected.") self.window.set_connected_state(True) return # The callback for when the broker responds with error messages. def on_log(client, userdata, level, buf): log.debug("on_log level %s: %s", level, userdata) return def on_disconnect(self, client, userdata, rc): log.debug("disconnected") self.window.write("Disconnected from server.") self.window.show_status("Disconnected.") self.window.set_connected_state(False) # The callback for when a message has been received on a topic to which this # client is subscribed. The message variable is a MQTTMessage that describes # all of the message parameters. # Some useful MQTTMessage fields: topic, payload, qos, retain, mid, properties. # The payload is a binary string (bytes). # qos is an integer quality of service indicator (0,1, or 2) # mid is an integer message ID. # N.B. this function is called from the network server thread, but the actual # processing needs to happen on the main thread for graphic output. def on_message(self, client, userdata, msg): self.window.write("{%s} %s" % (msg.topic, msg.payload)) self._messageReceived.emit(msg.topic, msg.payload) @QtCore.pyqtSlot(str, bytes) def process_message(self, topic, payload): # Update the particle plotter. The name of the particle is the topic # name, and the data updated depends on the message format. name = topic # Parse the message text by attempting to convert all tokens to integers. The values # are scaled so that the default range is 0 to 100. tokens = payload.split() try: values = [(0.01 * int(x)) for x in tokens] if len(values) > 1: self.window.plot.update_particle_position(name, values[0:2]) if len(values) > 4: self.window.plot.update_particle_color(name, values[2:5]) except ValueError: log.warning("Error parsing message into numbers: %s %s", topic, payload) return ################################################################ def set_subscription(self, sub): if self.client.is_connected(): self.client.unsubscribe(self.subscription) try: self.client.subscribe(sub) self.subscription = sub self.settings.setValue('mqtt_subscription', sub) except ValueError: self.window.write("Invalid subscription string, not changed.") self.client.subscribe(self.subscription) else: self.subscription = sub self.settings.setValue('mqtt_subscription', sub)
################################################################ def main(): # Optionally add an additional root log handler to stream messages to the terminal console. if False: console_handler = logging.StreamHandler() console_handler.setLevel(logging.DEBUG) console_handler.setFormatter(logging.Formatter('%(levelname)s:%(name)s: %(message)s')) logging.getLogger().addHandler(console_handler) # initialize the Qt system itself app = QtWidgets.QApplication(sys.argv) # create the main application controller main = MainApp() # run the event loop until the user is done log.info("Starting event loop.") sys.exit(app.exec_()) ################################################################ # Main script follows. This sequence is executed when the script is initiated from the command line. if __name__ == "__main__": main()