Source code for streaming.pb_previewer

#!/usr/bin/env python3
"""pb_previewer.py

Graphical viewer application for showing a representation of the Pausch Bridge
lights.  Uses python-osc to receive an image stream, uses PyQt6 to display a
bitmap.
"""
#-------------------------------------------------------------------------------
# standard Python libraries
import os, sys, logging, signal, configparser, argparse, time, datetime, subprocess

# This uses the PyQt6 library for the cross-platform GUI :
#   installation:      pip3 install PyQt6
#   pypi description:  https://pypi.org/project/PyQt6/
#   API documentation: https://www.riverbankcomputing.com/static/Docs/PyQt6/
from PyQt6 import QtCore        # https://doc.qt.io/qt-6
from PyQt6 import QtWidgets     # http://pyqt.sourceforge.net/Docs/PyQt5/index.html
from PyQt6 import QtNetwork
from PyQt6 import QtGui

# This uses the OpenCV computer vision library for video file input.
#   installation:      pip3 install opencv-python
#   pypi description:  https://pypi.org/project/opencv-python/
#   API documentation: https://docs.opencv.org/master/
import cv2 as cv                

# This uses the NumPy numerical library for image matrix manipulation.
#   installation:      pip3 install numpy
#   pypi description:  https://pypi.org/project/numpy
#   API documentation: https://numpy.org/doc/stable/
import numpy as np

# This uses python-osc to send UDP packets with data encoded using OSC.
#   installation:      pip3 install python-osc
#   pypi description:  https://pypi.org/project/python-osc/
#   source code:       https://github.com/attwad/python-osc
from pythonosc import udp_client
from pythonosc import osc_bundle_builder
from pythonosc import osc_message_builder
import pythonosc.dispatcher

#-------------------------------------------------------------------------------
# set up logger for module
scriptname = os.path.splitext(os.path.basename(__file__))[0]
log = logging.getLogger(scriptname)

#-------------------------------------------------------------------------------
[docs]class QtFrameBuffer(object): """Object to hold an image which can be used either as a drawing surface with QPainter or updated directly using numpy operations. Storage is shared between the QImage and ndarray. :param width: frame buffer width in pixels, default is 400 :param height: frame buffer height in pixels, default is 400 :param pixel: string token identifying pixel format (e.g. 'mono') """ pixel_properties = { # qformat, nptype, pixeldims, pixelsize, samplesize # pixeldims = number of pixel channels visible in numpy array shape # pixelsize = size in bytes per pixel # samplesize = size in bytes per sample 'mono' : (QtGui.QImage.Format.Format_Grayscale8, '|u1', 1, 1, 1), # Grayscale8: 8-bit mono, added in Qt 5.5 'bgr' : (QtGui.QImage.Format.Format_RGB32, '|u1', 3, 4, 1), # preferred rendering target; ordering assumes little-endian 'bgra' : (QtGui.QImage.Format.Format_ARGB32, '|u1', 4, 4, 1), 'rgb' : (QtGui.QImage.Format.Format_RGB888, '|u1', 3, 3, 1), 'rgba' : (QtGui.QImage.Format.Format_RGBA8888, '|u1', 4, 4, 1), } def __init__(self, width=400, height=400, pixel='rgba'): super().__init__() # size of the image self.height = height self.width = width # decode the pixel type token into properties qformat, nptype, pixeldims, pixelsize, samplesize = self.pixel_properties[pixel] # create a QImage to use as a drawing surface self.qimage = QtGui.QImage(self.width, self.height, qformat) self.qimage.fill(0) # Create a numpy view into that image; the numpy ndarray will share # pixel storage with the QImage. N.B. self.npimage must never be # assigned a new value or this will be broken, only the contents may be # updated. self.qimage.__array_interface__ = { 'shape': (self.height, self.width, pixeldims), 'typestr': nptype, 'data': self.qimage.bits().asarray(self.qimage.sizeInBytes()), 'strides': (self.qimage.bytesPerLine(), pixelsize, samplesize), 'version': 3, } self.npimage = np.asarray(self.qimage) del self.qimage.__array_interface__ # Clear the image data. Image indices: [row, col], top-left is origin. self.npimage[:] = 0 return
[docs] def get_default_painter(self): """Return a QPainter which can draw on the frame buffer. The painter has default image coordinates with +X to the right, +Y down, origin in the upper left, and drawing units in pixels. """ # set up a default painter with default coordinates qp = QtGui.QPainter() qp.begin(self.qimage) # qp.setRenderHint(QtGui.QPainter.Antialiasing) return qp
def get_centered_painter(self): # set up a default painter with centered coordinates qp = QtGui.QPainter() qp.begin(self.qimage) qp.translate(self.width/2, self.height/2) return qp
[docs] def get_np_frame_buffer(self): """Return a reference to the frame buffer as a numpy matrix. This matrix shares storage with the QImage serving as the backing store.""" return self.npimage
[docs] def get_qt_frame_buffer(self): """Return a reference to the frame buffer as a QImage. This object shares storage with a numpy matrix.""" return self.qimage
################################################################
[docs]class QtImageWidget(QtWidgets.QWidget): """Widget to display an image. This keeps a fixed-size frame buffer as a backing store so the display is always able to be repainted, e.g. during resize operations. The buffer can be used as a drawing surface by QPainter or updated directly using numpy operations. The buffer image is scaled to the current widget size when repainting the display, so the pixel data may be higher or lower resolution than the currently visible display. :param width: frame buffer width in pixels, default is 400 :param height: frame buffer height in pixels, default is 400 :param pixel: string token identifying pixel format, default is 'bgr' """ def __init__(self, width=400, height=400, pixel='rgb'): super().__init__() # configure the underlying widget self.setMinimumSize(QtCore.QSize(min(width, 600), min(height, 600))) # configure a dark fill around the image pal = QtGui.QPalette() pal.setColor(QtGui.QPalette.ColorRole.Window, QtGui.QColor(64, 64, 64)) # QtCore.Qt.black self.setPalette(pal) self.setAutoFillBackground(True) # Create a frame buffer to use as a drawing surface. Subclasses will # generally paint directly into this object. self.frame_buffer = QtFrameBuffer(width, height, pixel) # initialize the bounding box for drawing the image self._recalculate_image_bounds() return
[docs] def get_frame_buffer_painter(self): """Convenience function for drawing access to the frame buffer painter, returns a QPainter object. The end() method of the returned object should never be called. """ return self.frame_buffer.get_default_painter()
[docs] def width(self): """Return the frame buffer width.""" return self.frame_buffer.width
[docs] def height(self): """Return the frame buffer height.""" return self.frame_buffer.height
def _recalculate_image_bounds(self): geom = self.geometry() window_width = geom.width() window_height = geom.height() image_aspect = self.frame_buffer.width / self.frame_buffer.height window_aspect = window_width / window_height if image_aspect > window_aspect: scaling = window_width / self.frame_buffer.width else: scaling = window_height / self.frame_buffer.height # calculate the pixel coordinates of the image bounding rectangle as drawn self.draw_width = int(scaling*self.frame_buffer.width) self.draw_height = int(scaling*self.frame_buffer.height) self.draw_x_offset = int((window_width - self.draw_width)/2) self.draw_y_offset = int((window_height - self.draw_height)/2)
[docs] def widget_to_image_pos(self, widget_x, widget_y): """Translate a widget pixel position into the frame buffer pixel coordinates. This may return a position outside the frame buffer image. Returns a tuple (x, y). """ x = self.frame_buffer.width * (widget_x - self.draw_x_offset) / self.draw_width y = self.frame_buffer.height * (widget_y - self.draw_y_offset) / self.draw_height return x, y
[docs] def paintEvent(self, e): """QtWidget callback to draw the content. This paints the frame buffer onto the screen, scaled to fit within the current bounds. """ self._recalculate_image_bounds() qp = QtGui.QPainter() qp.begin(self) qp.drawImage(QtCore.QRect(self.draw_x_offset, self.draw_y_offset, self.draw_width, self.draw_height), self.frame_buffer.get_qt_frame_buffer()) qp.end() return
#-------------------------------------------------------------------------------
[docs]class QtPauschBridgeWidget(QtImageWidget): """Subclass the frame buffer display widget to draw the Pausch Bridge cartoon and manage user input.""" def __init__(self, config): rows = config.getint('video', 'frame_height', fallback=8) cols = config.getint('video', 'frame_width', fallback=228) super().__init__(width=cols, height=rows, pixel='rgb') self.setMinimumSize(QtCore.QSize(cols*3, rows*3)) self.delegate = None self.setFocusPolicy(QtCore.Qt.FocusPolicy.StrongFocus) # accept focus to enable keyboard events # self.setMouseTracking(True) self.recording = False self.video_path = None self.video_encoder = None self.video_stream = None # draw some color to start matrix = self.frame_buffer.get_np_frame_buffer() log.debug("bridge width buffer shape: %s", matrix.shape) # matrix[:,:,0:3] = [0, 0, 0] # matrix[::2,::2] = [255, 0, 0] return # ================================================================ def focusInEvent(self, e): log.debug("focusInEvent") def focusOutEvent(self, e): log.debug("focusOutEvent") def _qt_decode_keyboard_event(self, e): """Decode a Qt keyboard event into a dictionary of decoded properties. The results are redundant to support different interpretation schemes.""" # on macOS: # the 'command' key is the ControlModifier # the 'control' key selects button-2 and is MetaModifier # the 'option' key is the AltModifier # returns a PyQt6.QtCore.KeyboardModifiers object which wraps an integer bitmask modifiers = e.modifiers().value log.debug("Keyboard modifiers: %d", modifiers) result = {} result['shift'] = int(modifiers & QtCore.Qt.KeyboardModifier.ShiftModifier.value) != 0 result['control'] = int(modifiers & QtCore.Qt.KeyboardModifier.ControlModifier.value) != 0 result['alt'] = int(modifiers & QtCore.Qt.KeyboardModifier.AltModifier.value) != 0 result['meta'] = int(modifiers & QtCore.Qt.KeyboardModifier.MetaModifier.value) != 0 result['unmodified'] = (modifiers == QtCore.Qt.KeyboardModifier.NoModifier.value) result['text'] = e.text() result['key'] = e.key() return result def keyPressEvent(self, e): # on the mac: # the 'command' key is the ControlModifier # the 'control' key selects button-2 and is MetaModifier # the 'option' key is the AltModifier event = self._qt_decode_keyboard_event(e) # log.info("keyPressEvent: %s", event) if self.delegate is not None: self.delegate.key_press(event) else: log.debug("Null delegate.") def keyReleaseEvent(self, e): # log.debug("keyReleaseEvent: %s", e) # for now, just pass along plain keys modifiers = int(e.modifiers().value) if modifiers == 0: if self.delegate is not None: self.delegate.key_release(e.key()) def mousePressEvent(self, mouse): log.debug("mousePressEvent: %s", mouse) def mouseReleaseEvent(self, mouse): log.debug("mouseReleaseEvent: %s", mouse) def mouseMoveEvent(self, mouse): log.debug("mouseMoveEvent: %s", mouse) # ================================================================ def _capture_frame(self): path = os.path.join("samples", scriptname + datetime.datetime.now().strftime("-%Y-%m-%d-%H-%M-%S.png")) self.frame_buffer.get_qt_frame_buffer().save(path, 'PNG') log.info("Wrote sample image to %s", path) # ================================================================ def toggle_video_recording(self): if self.recording: self.close_video_stream() else: self.open_video_stream({'preset': 'avi', 'rate': 30}) def open_video_stream(self, properties): if not self.recording: if properties is None: properties = {} frame_rate = properties.get('rate', 25) preset = properties.get('preset', 'mp4') if preset == 'avi': # use MPNG in an AVI wrapper (lossless) options = ['ffmpeg', '-i', '-', '-c:v', 'png', '-r', '%d' % frame_rate, '-f', 'avi'] extension = 'avi' else: # default to h264 in MP4 wrapper options = ["ffmpeg","-i","-","-codec","h264","-r","%d" % frame_rate,"-vf","format=yuv420p","-profile:v","high","-level","4.0"] extension = 'mp4' self.video_path = scriptname + datetime.datetime.now().strftime("-%Y-%m-%d-%H-%M-%S." + extension) self.video_encoder = subprocess.Popen(options + [self.video_path], stdin=subprocess.PIPE) self.video_stream = self.video_encoder.stdin self.recording = True log.info("Opened ffmpeg subprocess for video output to %s", self.video_path) def write_video_stream_frame(self): if self.recording: buffer = QtCore.QBuffer() buffer.open(QtCore.QIODevice.OpenModeFlag.WriteOnly) self.frame_buffer.get_qt_frame_buffer().save(buffer, 'PNG') log.debug("Generated QBuffer with PNG file in %d bytes.", len(buffer.data().data())) self.video_stream.write(buffer.data().data()) log.debug("Wrote video stream frame.") # buffer.data().clear() buffer.close() def close_video_stream(self): if self.recording: self.video_stream.close() self.recording = False log.info("Closed video output subprocess.")
#-------------------------------------------------------------------------------
[docs]class AppWindow(QtWidgets.QMainWindow): """A custom main window which provides all GUI controls. This generally follows a model-view-controller convention in which this window provides the views, passing events to the application controller via callbacks. """ def __init__(self, main, config): super().__init__() # This GUI controller assumes it has access to an application controller # with the methods of qpodium.app.MainApp. self.main = main self.config = config # create the GUI elements self._setupUi() # finish initialization self.show() return # ------------------------------------------------------------------------------------------------ def _setupUi(self): # basic window setup self.setWindowTitle("PB Lights") self.resize(900, 200) # create the essential layout self.centralwidget = QtWidgets.QWidget(self) self.setCentralWidget(self.centralwidget) self.verticalLayout = QtWidgets.QVBoxLayout(self.centralwidget) self.verticalLayout.setContentsMargins(-1, -1, -1, 9) # left, top, right, bottom # set up the status bar which appears at the bottom of the window self.statusbar = QtWidgets.QStatusBar(self) self.setStatusBar(self.statusbar) # create the bridge graphics area, which also can receive keyboard strokes self.cartoon = QtPauschBridgeWidget(self.config) self.cartoon.delegate = self.main # direct delegate messages to MainApp self.verticalLayout.addWidget(self.cartoon) # set up the main menu self.menubar = QtWidgets.QMenuBar(self) self.menubar.setGeometry(QtCore.QRect(0, 0, 500, 22)) self.menubar.setNativeMenuBar(False) self.menubar.setObjectName("menubar") self.setMenuBar(self.menubar) # set up the File menu self.menuTitle = QtWidgets.QMenu(self.menubar) self.menuTitle.setTitle("File") self.actionQuit = QtGui.QAction(self) self.actionQuit.setText("Quit") self.actionQuit.setShortcut("Ctrl+Q") self.actionQuit.triggered.connect(self.quitSelected) self.menuTitle.addAction(self.actionQuit) self.menubar.addAction(self.menuTitle.menuAction()) # set up the Record menu menuTitle = QtWidgets.QMenu(self.menubar) menuTitle.setTitle("Record") action = QtGui.QAction(self) action.setText("Toggle") action.setShortcut("Ctrl+R") action.triggered.connect(self.toggleRecording) menuTitle.addAction(action) self.menubar.addAction(menuTitle.menuAction()) return # --- window and qt event processing ------------------------------------------------------------- def show_status(self, string): self.statusbar.showMessage(string) def quitSelected(self): log.info("User selected quit.") self.close() def toggleRecording(self): log.info("User selected recording toggle.") self.cartoon.toggle_video_recording() def closeEvent(self, event): log.info("Main window received window close event.") self.main.app_is_exiting() super().closeEvent(event)
################################################################
[docs]class MainApp(): """Main application object holding any non-GUI related state.""" def __init__(self, args, config): # Attach a handler to the keyboard interrupt (control-C). signal.signal(signal.SIGINT, self._sigint_handler) # Load any machine-specific persistent application settings. # This should be limited to non-portable values such as I/O port selections. # Other properties should be stored in a config file. # QtCore.QCoreApplication.setOrganizationName("IDeATe") # QtCore.QCoreApplication.setOrganizationDomain("ideate.cmu.edu") # QtCore.QCoreApplication.setApplicationName('pb_previewer') # self.settings = QtCore.QSettings() # uncomment to restore 'factory defaults' # self.settings.clear() # save the configuration parser self.config = config # create the interface window self.window = AppWindow(self, self.config) # Initialize the OSC message dispatch system. self.dispatcher = pythonosc.dispatcher.Dispatcher() self.dispatcher.map("/pbridge/frame/size", self.frame_metadata) self.dispatcher.map("/pbridge/frame/data", self.frame_data) self.dispatcher.set_default_handler(self.unknown_message) # create the OSC server port self.listener_address = self.config['network']['osc_host'] self.listener_portnum = int(self.config['network']['osc_port']) self.port = None self.open_receiver() self.last_client_host = None self.last_client_port = None self.client_packet_count = 0 return #================================================================ # Manage the OSC UDP server port. def open_receiver(self): # create a UDP socket to send and receive messages from the client if self.port is not None: self.port.close() self.port = QtNetwork.QUdpSocket() log.info("Creating OSC server port on %s, %d", self.listener_address, self.listener_portnum) success = self.port.bind(QtNetwork.QHostAddress(self.listener_address), self.listener_portnum) if not success: self.window.show_status("Failed to bind listener socket.") log.error("Failed to bind OSC listener socket.") self.port.close() self.port = None else: self.port.readyRead.connect(self.message_received) self.window.show_status("Ready to go, listening for OSC UDP packets on %s:%d..." % (self.listener_address, self.listener_portnum)) log.info("OSC server port opened.") return def message_received(self): # the host is an instance of QHostAddress msg, host, port = self.port.readDatagram(20000) client_host = host.toString() # detect new connections and reset the packet count if client_host != self.last_client_host or port != self.last_client_port: self.last_client_host = client_host self.last_client_port = port self.client_packet_count = 1 else: self.client_packet_count += 1 if (self.client_packet_count % 900) == 1: log.debug("Received UDP packet %d from %s port %d with %d bytes.", self.client_packet_count, client_host, port, len(msg)) self.dispatcher.call_handlers_for_packet(msg, host) return
[docs] def unknown_message(self, msgaddr, *args): """Default handler for unrecognized OSC messages.""" log.info("OSC received at %s with %d args" % (msgaddr, len(args)))
def frame_metadata(self, msgaddr, *args): # log.debug("OSC received %s: %s", msgaddr, args) if (len(args) != 3 or args[0] != 8 or args[1] != 228 or args[2] != 3): log.warn("Unsupported image size: %s", args) def frame_data(self, msgaddr, *args): # log.debug("OSC received %s with %d bytes", msgaddr, len(args[0])) pix = np.frombuffer(args[0], dtype=np.uint8) matrix = self.window.cartoon.frame_buffer.get_np_frame_buffer() matrix[...,0:3] = pix.reshape(8, 228, 3)[...] self.window.cartoon.repaint() self.window.cartoon.write_video_stream_frame() #================================================================ def app_is_exiting(self): log.info("app_is_exiting") # here is where resources could be stopped def _sigint_handler(self, signal_number, stack_frame): # pylint: disable=unused-argument print("Keyboard interrupt caught, running close handlers...") self.app_is_exiting() sys.exit(0) #================================================================ # Callbacks from the QtPauschBridgeWidget def key_press(self, key_event): key = key_event['key'] log.info("User key press: %s", key) def key_release(self, key): log.info("User key release: %s", key) return
################################################################ def _main(args): # Create a configuration object with default settings. config = configparser.ConfigParser() config['log'] = {'file_log_level' : '20', # log INFO to file 'console_log_level' : '30'} # log WARNING to console config['video'] = {'frame_width' : '228', 'frame_height' : '8', 'frame_rate' : '30'} config['network'] = {'osc_host' : '127.0.0.1', 'osc_port': '23432' } # Load configuration file(s). configuration_file_path = scriptname + '.config' files_read = config.read([configuration_file_path] + args.configs) # Add root log handler to stream messages to the terminal console. console_handler = logging.StreamHandler() console_level = int(config['log']['console_log_level']) if args.verbose: console_level = min(logging.DEBUG, console_level) console_handler.setLevel(console_level) console_handler.setFormatter(logging.Formatter('%(levelname)s:%(name)s: %(message)s')) logging.getLogger().addHandler(console_handler) # Add a file handler to the root logger. log_path = scriptname + '.log' file_handler = logging.FileHandler(log_path) file_handler.setLevel(int(config['log']['file_log_level'])) file_handler.setFormatter(logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s', datefmt="%Y-%m-%d %H:%M:%S")) logging.getLogger().addHandler(file_handler) # Set the root logger to the lowest requested log level. logging.getLogger().setLevel(min(console_handler.level, file_handler.level)) # Report on the configuration file results after logging is established. if len(files_read) > 0: log.info("Read configuration from %s", files_read) else: log.info("Unable to read configuration from %s", configuration_file_path) if args.path is not None: log.info("Writing full configuration to %s", args.path) with open(args.path, "w") as output: config.write(output) #------------------------------------------------------------------------------- # initialize the Qt system app = QtWidgets.QApplication(sys.argv) # create the main application controller main_app = MainApp(args, config) # pylint: disable=unused-variable # run the event loop until the user is done log.info("Starting event loop.") sys.exit(app.exec()) ################################################################ # Main script follows. This sequence is executed when the script is initiated from the command line. if __name__ == "__main__": parser = argparse.ArgumentParser( description = """Display a Pausch Bridge video stream in a PyQt6 window.""") parser.add_argument( '-v', '--verbose', action='store_true', help="Enable more detailed output.") parser.add_argument( '-s', dest='path', type=str, help="Save full configuration to given path.") parser.add_argument( 'configs', nargs="*", help = "Paths of additional configuration files.") args = parser.parse_args() _main(args)