#!/usr/bin/env python3
"""pb_previewer.py
Graphical viewer application for showing a representation of the Pausch Bridge
lights. Uses python-osc to receive an image stream, uses PyQt6 to display a
bitmap.
"""
#-------------------------------------------------------------------------------
# standard Python libraries
import os, sys, logging, signal, configparser, argparse, time, datetime, subprocess
# This uses the PyQt6 library for the cross-platform GUI :
# installation: pip3 install PyQt6
# pypi description: https://pypi.org/project/PyQt6/
# API documentation: https://www.riverbankcomputing.com/static/Docs/PyQt6/
from PyQt6 import QtCore # https://doc.qt.io/qt-6
from PyQt6 import QtWidgets # http://pyqt.sourceforge.net/Docs/PyQt5/index.html
from PyQt6 import QtNetwork
from PyQt6 import QtGui
# This uses the OpenCV computer vision library for video file input.
# installation: pip3 install opencv-python
# pypi description: https://pypi.org/project/opencv-python/
# API documentation: https://docs.opencv.org/master/
import cv2 as cv
# This uses the NumPy numerical library for image matrix manipulation.
# installation: pip3 install numpy
# pypi description: https://pypi.org/project/numpy
# API documentation: https://numpy.org/doc/stable/
import numpy as np
# This uses python-osc to send UDP packets with data encoded using OSC.
# installation: pip3 install python-osc
# pypi description: https://pypi.org/project/python-osc/
# source code: https://github.com/attwad/python-osc
from pythonosc import udp_client
from pythonosc import osc_bundle_builder
from pythonosc import osc_message_builder
import pythonosc.dispatcher
#-------------------------------------------------------------------------------
# set up logger for module
scriptname = os.path.splitext(os.path.basename(__file__))[0]
log = logging.getLogger(scriptname)
#-------------------------------------------------------------------------------
[docs]class QtFrameBuffer(object):
"""Object to hold an image which can be used either as a drawing surface with
QPainter or updated directly using numpy operations. Storage is shared
between the QImage and ndarray.
:param width: frame buffer width in pixels, default is 400
:param height: frame buffer height in pixels, default is 400
:param pixel: string token identifying pixel format (e.g. 'mono')
"""
pixel_properties = {
# qformat, nptype, pixeldims, pixelsize, samplesize
# pixeldims = number of pixel channels visible in numpy array shape
# pixelsize = size in bytes per pixel
# samplesize = size in bytes per sample
'mono' : (QtGui.QImage.Format.Format_Grayscale8, '|u1', 1, 1, 1), # Grayscale8: 8-bit mono, added in Qt 5.5
'bgr' : (QtGui.QImage.Format.Format_RGB32, '|u1', 3, 4, 1), # preferred rendering target; ordering assumes little-endian
'bgra' : (QtGui.QImage.Format.Format_ARGB32, '|u1', 4, 4, 1),
'rgb' : (QtGui.QImage.Format.Format_RGB888, '|u1', 3, 3, 1),
'rgba' : (QtGui.QImage.Format.Format_RGBA8888, '|u1', 4, 4, 1),
}
def __init__(self, width=400, height=400, pixel='rgba'):
super().__init__()
# size of the image
self.height = height
self.width = width
# decode the pixel type token into properties
qformat, nptype, pixeldims, pixelsize, samplesize = self.pixel_properties[pixel]
# create a QImage to use as a drawing surface
self.qimage = QtGui.QImage(self.width, self.height, qformat)
self.qimage.fill(0)
# Create a numpy view into that image; the numpy ndarray will share
# pixel storage with the QImage. N.B. self.npimage must never be
# assigned a new value or this will be broken, only the contents may be
# updated.
self.qimage.__array_interface__ = {
'shape': (self.height, self.width, pixeldims),
'typestr': nptype,
'data': self.qimage.bits().asarray(self.qimage.sizeInBytes()),
'strides': (self.qimage.bytesPerLine(), pixelsize, samplesize),
'version': 3,
}
self.npimage = np.asarray(self.qimage)
del self.qimage.__array_interface__
# Clear the image data. Image indices: [row, col], top-left is origin.
self.npimage[:] = 0
return
[docs] def get_default_painter(self):
"""Return a QPainter which can draw on the frame buffer. The painter has
default image coordinates with +X to the right, +Y down, origin in the
upper left, and drawing units in pixels.
"""
# set up a default painter with default coordinates
qp = QtGui.QPainter()
qp.begin(self.qimage)
# qp.setRenderHint(QtGui.QPainter.Antialiasing)
return qp
def get_centered_painter(self):
# set up a default painter with centered coordinates
qp = QtGui.QPainter()
qp.begin(self.qimage)
qp.translate(self.width/2, self.height/2)
return qp
[docs] def get_np_frame_buffer(self):
"""Return a reference to the frame buffer as a numpy matrix. This matrix shares
storage with the QImage serving as the backing store."""
return self.npimage
[docs] def get_qt_frame_buffer(self):
"""Return a reference to the frame buffer as a QImage. This object shares
storage with a numpy matrix."""
return self.qimage
################################################################
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
[docs]class AppWindow(QtWidgets.QMainWindow):
"""A custom main window which provides all GUI controls. This generally follows
a model-view-controller convention in which this window provides the views,
passing events to the application controller via callbacks.
"""
def __init__(self, main, config):
super().__init__()
# This GUI controller assumes it has access to an application controller
# with the methods of qpodium.app.MainApp.
self.main = main
self.config = config
# create the GUI elements
self._setupUi()
# finish initialization
self.show()
return
# ------------------------------------------------------------------------------------------------
def _setupUi(self):
# basic window setup
self.setWindowTitle("PB Lights")
self.resize(900, 200)
# create the essential layout
self.centralwidget = QtWidgets.QWidget(self)
self.setCentralWidget(self.centralwidget)
self.verticalLayout = QtWidgets.QVBoxLayout(self.centralwidget)
self.verticalLayout.setContentsMargins(-1, -1, -1, 9) # left, top, right, bottom
# set up the status bar which appears at the bottom of the window
self.statusbar = QtWidgets.QStatusBar(self)
self.setStatusBar(self.statusbar)
# create the bridge graphics area, which also can receive keyboard strokes
self.cartoon = QtPauschBridgeWidget(self.config)
self.cartoon.delegate = self.main # direct delegate messages to MainApp
self.verticalLayout.addWidget(self.cartoon)
# set up the main menu
self.menubar = QtWidgets.QMenuBar(self)
self.menubar.setGeometry(QtCore.QRect(0, 0, 500, 22))
self.menubar.setNativeMenuBar(False)
self.menubar.setObjectName("menubar")
self.setMenuBar(self.menubar)
# set up the File menu
self.menuTitle = QtWidgets.QMenu(self.menubar)
self.menuTitle.setTitle("File")
self.actionQuit = QtGui.QAction(self)
self.actionQuit.setText("Quit")
self.actionQuit.setShortcut("Ctrl+Q")
self.actionQuit.triggered.connect(self.quitSelected)
self.menuTitle.addAction(self.actionQuit)
self.menubar.addAction(self.menuTitle.menuAction())
# set up the Record menu
menuTitle = QtWidgets.QMenu(self.menubar)
menuTitle.setTitle("Record")
action = QtGui.QAction(self)
action.setText("Toggle")
action.setShortcut("Ctrl+R")
action.triggered.connect(self.toggleRecording)
menuTitle.addAction(action)
self.menubar.addAction(menuTitle.menuAction())
return
# --- window and qt event processing -------------------------------------------------------------
def show_status(self, string):
self.statusbar.showMessage(string)
def quitSelected(self):
log.info("User selected quit.")
self.close()
def toggleRecording(self):
log.info("User selected recording toggle.")
self.cartoon.toggle_video_recording()
def closeEvent(self, event):
log.info("Main window received window close event.")
self.main.app_is_exiting()
super().closeEvent(event)
################################################################
[docs]class MainApp():
"""Main application object holding any non-GUI related state."""
def __init__(self, args, config):
# Attach a handler to the keyboard interrupt (control-C).
signal.signal(signal.SIGINT, self._sigint_handler)
# Load any machine-specific persistent application settings.
# This should be limited to non-portable values such as I/O port selections.
# Other properties should be stored in a config file.
# QtCore.QCoreApplication.setOrganizationName("IDeATe")
# QtCore.QCoreApplication.setOrganizationDomain("ideate.cmu.edu")
# QtCore.QCoreApplication.setApplicationName('pb_previewer')
# self.settings = QtCore.QSettings()
# uncomment to restore 'factory defaults'
# self.settings.clear()
# save the configuration parser
self.config = config
# create the interface window
self.window = AppWindow(self, self.config)
# Initialize the OSC message dispatch system.
self.dispatcher = pythonosc.dispatcher.Dispatcher()
self.dispatcher.map("/pbridge/frame/size", self.frame_metadata)
self.dispatcher.map("/pbridge/frame/data", self.frame_data)
self.dispatcher.set_default_handler(self.unknown_message)
# create the OSC server port
self.listener_address = self.config['network']['osc_host']
self.listener_portnum = int(self.config['network']['osc_port'])
self.port = None
self.open_receiver()
self.last_client_host = None
self.last_client_port = None
self.client_packet_count = 0
return
#================================================================
# Manage the OSC UDP server port.
def open_receiver(self):
# create a UDP socket to send and receive messages from the client
if self.port is not None:
self.port.close()
self.port = QtNetwork.QUdpSocket()
log.info("Creating OSC server port on %s, %d", self.listener_address, self.listener_portnum)
success = self.port.bind(QtNetwork.QHostAddress(self.listener_address), self.listener_portnum)
if not success:
self.window.show_status("Failed to bind listener socket.")
log.error("Failed to bind OSC listener socket.")
self.port.close()
self.port = None
else:
self.port.readyRead.connect(self.message_received)
self.window.show_status("Ready to go, listening for OSC UDP packets on %s:%d..." % (self.listener_address, self.listener_portnum))
log.info("OSC server port opened.")
return
def message_received(self):
# the host is an instance of QHostAddress
msg, host, port = self.port.readDatagram(20000)
client_host = host.toString()
# detect new connections and reset the packet count
if client_host != self.last_client_host or port != self.last_client_port:
self.last_client_host = client_host
self.last_client_port = port
self.client_packet_count = 1
else:
self.client_packet_count += 1
if (self.client_packet_count % 900) == 1:
log.debug("Received UDP packet %d from %s port %d with %d bytes.", self.client_packet_count, client_host, port, len(msg))
self.dispatcher.call_handlers_for_packet(msg, host)
return
[docs] def unknown_message(self, msgaddr, *args):
"""Default handler for unrecognized OSC messages."""
log.info("OSC received at %s with %d args" % (msgaddr, len(args)))
def frame_metadata(self, msgaddr, *args):
# log.debug("OSC received %s: %s", msgaddr, args)
if (len(args) != 3 or args[0] != 8 or args[1] != 228 or args[2] != 3):
log.warn("Unsupported image size: %s", args)
def frame_data(self, msgaddr, *args):
# log.debug("OSC received %s with %d bytes", msgaddr, len(args[0]))
pix = np.frombuffer(args[0], dtype=np.uint8)
matrix = self.window.cartoon.frame_buffer.get_np_frame_buffer()
matrix[...,0:3] = pix.reshape(8, 228, 3)[...]
self.window.cartoon.repaint()
self.window.cartoon.write_video_stream_frame()
#================================================================
def app_is_exiting(self):
log.info("app_is_exiting")
# here is where resources could be stopped
def _sigint_handler(self, signal_number, stack_frame): # pylint: disable=unused-argument
print("Keyboard interrupt caught, running close handlers...")
self.app_is_exiting()
sys.exit(0)
#================================================================
# Callbacks from the QtPauschBridgeWidget
def key_press(self, key_event):
key = key_event['key']
log.info("User key press: %s", key)
def key_release(self, key):
log.info("User key release: %s", key)
return
################################################################
def _main(args):
# Create a configuration object with default settings.
config = configparser.ConfigParser()
config['log'] = {'file_log_level' : '20', # log INFO to file
'console_log_level' : '30'} # log WARNING to console
config['video'] = {'frame_width' : '228', 'frame_height' : '8', 'frame_rate' : '30'}
config['network'] = {'osc_host' : '127.0.0.1', 'osc_port': '23432' }
# Load configuration file(s).
configuration_file_path = scriptname + '.config'
files_read = config.read([configuration_file_path] + args.configs)
# Add root log handler to stream messages to the terminal console.
console_handler = logging.StreamHandler()
console_level = int(config['log']['console_log_level'])
if args.verbose: console_level = min(logging.DEBUG, console_level)
console_handler.setLevel(console_level)
console_handler.setFormatter(logging.Formatter('%(levelname)s:%(name)s: %(message)s'))
logging.getLogger().addHandler(console_handler)
# Add a file handler to the root logger.
log_path = scriptname + '.log'
file_handler = logging.FileHandler(log_path)
file_handler.setLevel(int(config['log']['file_log_level']))
file_handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S"))
logging.getLogger().addHandler(file_handler)
# Set the root logger to the lowest requested log level.
logging.getLogger().setLevel(min(console_handler.level, file_handler.level))
# Report on the configuration file results after logging is established.
if len(files_read) > 0:
log.info("Read configuration from %s", files_read)
else:
log.info("Unable to read configuration from %s", configuration_file_path)
if args.path is not None:
log.info("Writing full configuration to %s", args.path)
with open(args.path, "w") as output:
config.write(output)
#-------------------------------------------------------------------------------
# initialize the Qt system
app = QtWidgets.QApplication(sys.argv)
# create the main application controller
main_app = MainApp(args, config) # pylint: disable=unused-variable
# run the event loop until the user is done
log.info("Starting event loop.")
sys.exit(app.exec())
################################################################
# Main script follows. This sequence is executed when the script is initiated from the command line.
if __name__ == "__main__":
parser = argparse.ArgumentParser( description = """Display a Pausch Bridge video stream in a PyQt6 window.""")
parser.add_argument( '-v', '--verbose', action='store_true', help="Enable more detailed output.")
parser.add_argument( '-s', dest='path', type=str, help="Save full configuration to given path.")
parser.add_argument( 'configs', nargs="*", help = "Paths of additional configuration files.")
args = parser.parse_args()
_main(args)