Source code for stage.dmxusbpro
"""dmxusbpro.py
Class to manage a connection to a serial-connected Enttec DMXUSB Pro
interface. Requires pySerial and numpy.
"""
import logging
import os.path
import numpy as np
# This requires a pySerial installation.
# Package details: https://pypi.python.org/pypi/pyserial
# Documentation: http://pythonhosted.org/pyserial/
import serial
# initialize logging for this module
log = logging.getLogger(__name__)
#================================================================
[docs]
class DMXUSBPro:
"""Class to manage a connection to a serial-connected Enttec DMXUSB Pro
interface. This only supports output.
:param port: the name of the serial port device
:param verbose: flag to increase console output
:param debug: flag to print raw inputs on sconsole
:param kwargs: collect any unused keyword arguments
"""
def __init__(self, port=None, verbose=False, debug=False, universe_size=25, **kwargs ):
# Initialize a default universe. This publicly readable and writable.
# The Enttec requires a minimum universe size of 25.
self.universe = np.zeros((universe_size), dtype=np.uint8)
# Initialize internal state.
self.verbose = verbose
self.debug = debug
self.portname = port
self.port = None
self.output = None
self.input = None
return
[docs]
def is_connected(self):
"""Return true if the serial port device is open."""
return self.port is not None
[docs]
def set_serial_port_name(self, name):
"""Set the name of the serial port device."""
self.portname = name
return
[docs]
def open_serial_port(self):
"""Open the serial connection to the controller."""
if not os.path.exists(self.portname):
log.warning("Serial port %s not available, running offline.", self.portname)
return
# open the serial port
self.port = serial.Serial( self.portname, 115200 )
if self.verbose:
log.info("Opened serial port named %s", self.port.name)
# save separate copies of the file object; this will ease simulation using other sources
self.output = self.port
self.input = self.port
return
[docs]
def close_serial_port(self):
"""Shut down the serial connection, after which this object may no longer be used."""
if self.port is not None:
if self.verbose:
log.info("Closing port.")
self.port.close()
self.port = None
return
[docs]
def send_universe(self):
"""Issue a DMX universe update."""
if self.output is None:
if self.verbose:
log.debug("DMX port not open: would send %s", self.universe)
else:
message = np.ndarray((6 + self.universe.size), dtype=np.uint8)
message[0:2] = [126, 6] # Send DMX Packet header
message[2] = (self.universe.size+1) % 256 # data length LSB
message[3] = (self.universe.size+1) >> 8 # data length MSB
message[4] = 0 # zero 'start code' in first universe position
message[5:5+self.universe.size] = self.universe
message[-1] = 231 # end of message delimiter
if self.verbose:
log.debug("Sending: '%s'", message)
self.output.write(message.tobytes())
return