Source code for theater.dmxusbpro

"""dmxusbpro.py

Class to manage a connection to a serial-connected Enttec DMXUSB Pro
interface.  Requires pySerial and numpy.
"""

import logging
import os.path
import numpy as np

# This requires a pySerial installation.
#  Package details: https://pypi.python.org/pypi/pyserial
#  Documentation:   http://pythonhosted.org/pyserial/
import serial

# initialize logging for this module
log = logging.getLogger(__name__)

#================================================================
[docs]class DMXUSBPro: """Class to manage a connection to a serial-connected Enttec DMXUSB Pro interface. This only supports output. :param port: the name of the serial port device :param verbose: flag to increase console output :param debug: flag to print raw inputs on sconsole :param kwargs: collect any unused keyword arguments """ def __init__(self, port=None, verbose=False, debug=False, universe_size=25, **kwargs ): # Initialize a default universe. This publicly readable and writable. # The Enttec requires a minimum universe size of 25. self.universe = np.zeros((universe_size), dtype=np.uint8) # Initialize internal state. self.verbose = verbose self.debug = debug self.portname = port self.port = None self.output = None self.input = None return
[docs] def is_connected(self): """Return true if the serial port device is open.""" return self.port is not None
[docs] def set_serial_port_name(self, name): """Set the name of the serial port device.""" self.portname = name return
[docs] def open_serial_port(self): """Open the serial connection to the controller.""" if not os.path.exists(self.portname): log.warning("Serial port %s not available, running offline.", self.portname) return # open the serial port self.port = serial.Serial( self.portname, 115200 ) if self.verbose: log.info("Opened serial port named %s", self.port.name) # save separate copies of the file object; this will ease simulation using other sources self.output = self.port self.input = self.port return
[docs] def flush_serial_input(self): """Clear the input buffer.""" if self.input is not None: self.input.flushInput()
[docs] def close_serial_port(self): """Shut down the serial connection, after which this object may no longer be used.""" if self.port is not None: if self.verbose: log.info("Closing port.") self.port.close() self.port = None return
[docs] def send_universe(self): """Issue a DMX universe update.""" if self.output is None: if self.verbose: log.debug("DMX port not open: would send %s", self.universe) else: message = np.ndarray((6 + self.universe.size), dtype=np.uint8) message[0:2] = [126, 6] # Send DMX Packet header message[2] = (self.universe.size+1) % 256 # data length LSB message[3] = (self.universe.size+1) >> 8 # data length MSB message[4] = 0 # zero 'start code' in first universe position message[5:5+self.universe.size] = self.universe message[-1] = 231 # end of message delimiter if self.verbose: log.debug("Sending: '%s'", message) self.output.write(message.tobytes()) return