Source code for rcp.dmx

"""Objects related to DMX lighting control.
"""
################################################################
# Written in 2019 by Garth Zeglin <garthz@cmu.edu>

# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.

# You should have received a copy of the CC0 Public Domain Dedication along with this software.
# If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.

################################################################
# standard Python libraries
import math, logging
import numpy as np

# for documentation on the PyQt5 API, see http://pyqt.sourceforge.net/Docs/PyQt5/index.html
from PyQt5 import QtCore, QtSerialPort

# set up logger for module
log = logging.getLogger('dmx')

# filter out most logging; the default is NOTSET which passes along everything
# log.setLevel(logging.INFO)

################################################################
[docs] class QtDMXUSBPro(object): """Class to manage a serial connection to an ENTTEC DMXUSB PRO for DMX lighting control output. Uses the Qt QSerialPort object for data transport. This currently only supports output, although the device is capable of receiving DMX. For details on the device, please see https://www.enttec.com/range/controls/dmx-usb-interfaces/ """ def __init__(self): self._portname = None self._port = None # Initialize a default DMX 'universe', i.e. an addressable space of # 8-bit registers. The Enttec requires a minimum universe size of 25. self._universe = np.zeros((25), dtype=np.uint8) return
[docs] def set_size(self, channels): """Set the size of the DMX 'universe'. The value is limited to the valid range of [25,512]. Each channel is an 8-bit value transmitted on the DMX bus; typically each fixture occupies several sequential channels at a start address configured on the fixture. """ new_size = min(max(channels, 25), 512) self._universe = np.resize(self._universe, new_size) log.info("Resized DMX universe to %d channels." % new_size) return
[docs] def available_ports(self): """Return a list of names of available serial ports.""" return [port.portName() for port in QtSerialPort.QSerialPortInfo.availablePorts()] return
def set_port(self, name): if name == "<no selection>": log.debug("User picked the null serial port entry.") self._portname = None else: self._portname = name
[docs] def open(self): """Open the serial port and initialize communications. If the port is already open, this will close it first. If the current name is None, this will not open anything. Returns True if the port is open, else False.""" if self._port is not None: self.close() if self._portname is None: log.debug("No port name provided so not opening port.") return False return self._port = QtSerialPort.QSerialPort() self._port.setBaudRate(115200) self._port.setPortName(self._portname) # open the serial port if self._port.open(QtCore.QIODevice.ReadWrite): log.info("Opened DMX serial port %s", self._port.portName()) # always process data as it becomes available self._port.readyRead.connect(self._read_input) return True else: # Error codes: https://doc.qt.io/qt-5/qserialport.html#SerialPortError-enum errcode = self._port.error() if errcode == QtSerialPort.QSerialPort.PermissionError: log.warning("Failed to open DMX serial port %s with a QSerialPort PermissionError, which could involve an already running control process, a stale lock file, or dialout group permissions.", self._port.portName()) else: log.warning("Failed to open DMX serial port %s with a QSerialPort error code %d.", self._port.portName(), errcode) self._port = None return False
def set_and_open_port(self, name): self.set_port(name) self.open()
[docs] def close(self): """Shut down the serial connection to the DMX device.""" if self._port is not None: log.info("Closing DMX serial port %s", self._port.portName()) self._port.close() self._port = None return
# === internals ======================================================= def _read_input(self): # Read as much input as available; callback from Qt event loop. data = self._port.readAll() if len(data) > 0: log.debug("Received %d bytes from DMX interface." % len(data)) return def _send_universe(self): """Issue a DMX universe update.""" if self._port is None: log.debug("DMX port not open for output during send.") else: universe_size = min(512, self._universe.size) message = np.ndarray((6 + universe_size), dtype=np.uint8) message[0:2] = [126, 6] # Send DMX Packet header message[2] = (universe_size+1) % 256 # data length LSB message[3] = (universe_size+1) >> 8 # data length MSB message[4] = 0 # zero 'start code' in first universe position message[5:5+universe_size] = self._universe[0:universe_size] message[-1] = 231 # end of message delimiter # log.debug("Sending to DMX: '%s'", message) self._port.write(message.tobytes()) # ================================================================
[docs] def set_channel(self, channel, value): """Set a single channel value and update the hardware. :param start: zero-based index of channel to update :param value: 8-bit integer value """ self._universe[channel] = value self._send_universe() return
[docs] def set_channels(self, start, values): """Set a range of channels and update the hardware. :param start: zero-based index of first channel to update :param values: list or numpy array of 8-bit integer values """ size = min(self._universe.size - start, len(values)) self._universe[start:start+size] = values[0:size] self._send_universe() return
################################################################ class ColorInterpolator(object): def __init__(self, fixtures, channels_per_fixture): # opacity of cue updates self.alpha = 1.0 # transition time constant in seconds self.duration = 1.0 # array sizes self.fixtures = fixtures self.channels_per_fixture = channels_per_fixture # initialize the color vectors self.target_colors = np.zeros((self.fixtures, self.channels_per_fixture)) self.current_colors = np.zeros((self.fixtures, self.channels_per_fixture)) self.color_velocity = np.zeros((self.fixtures, self.channels_per_fixture)) # flag for culling null outputs self.colors_changed = False return def current_dmx_values(self): """Return a universe of 8-bit integer DMX values with the current colors mapped to the specific fixture channel configuration.""" return np.round(self.current_colors).astype(np.uint8).flatten() def current_rgb_values(self): """Return a list of 8-bit integer (red, green, blue) values with the current color for each fixture.""" return np.round(self.current_colors[:,0:3]).astype(np.uint8) def update_for_interval(self, interval): """Polling function to update internal state. Returns true if the DMX outputs should be updated.""" # compute any difference between actual and target colors errors = self.target_colors - self.current_colors if errors.any(): # calculate the maximum possible change, bound it to the actual error, then apply it delta = interval * self.color_velocity abserror = np.abs(errors) bounded_delta = np.minimum(abserror, np.maximum(-abserror, delta)) self.current_colors += bounded_delta self.colors_changed = True value = self.colors_changed self.colors_changed = False return value def set_color_target(self, fixture, color): """Update the color target for the given fixture using the current opacity. The output will change to the new targets over the current transition interval. Note that the opacity blends between the current color and the given color, not the current target and the given color. """ if fixture < self.fixtures: channels = min(len(color), self.channels_per_fixture) newcolor = np.array(color[0:channels]) blended = (self.alpha * newcolor) + ((1 - self.alpha) * self.current_colors[fixture,0:channels]) self.target_colors[fixture,0:channels] = blended difference = self.target_colors[fixture] - self.current_colors[fixture] duration = max(self.duration, 0.020) self.color_velocity[fixture] = difference / duration return def set_channel_target(self, fixture, channel, value): """Update a specific color channel target for the given fixture using the current opacity. The output will change to the new target over the current transition interval. Note that the opacity blends between the current color and the given color, not the current target and the given color. """ if fixture < self.fixtures and channel < self.channels_per_fixture: blended = (self.alpha * value) + ((1 - self.alpha) * self.current_colors[fixture, channel]) self.target_colors[fixture,channel] = blended difference = blended - self.current_colors[fixture, channel] duration = max(self.duration, 0.020) self.color_velocity[fixture, channel] = difference / duration return def set_current_color(self, fixture, color): """Set a fixture to the given color without a transition. The next update cycle will output the color.""" if fixture < self.fixtures: channels = min(len(color), self.channels_per_fixture) self.target_colors [fixture, 0:channels] = color[0:channels] self.current_colors[fixture, 0:channels] = color[0:channels] self.color_velocity[fixture,:] = 0.0 self.colors_changed = True return def set_dmx_value(self, channel, value): """Set a single DMX channel to the given value without a transition. The next update cycle will output the color. This function performs the inverse of the fixture->DMX mapping in current_dmx_values(). """ fixture = channel // self.channels_per_fixture color = channel % self.channels_per_fixture if fixture < self.fixtures and color < self.channels_per_fixture: self.current_colors[fixture,color] = value self.target_colors[fixture,color] = value self.colors_changed = True return ################################################################