Theater Lighting Server

The following script implements an OSC network server and cubic spline interpolator which generates output to a set of DMX fixtures. The hardware serial port I/O is handled by Theater DMX Host Interface.

  1#!/usr/bin/env python3
  2
  3cmd_desc = "Communicate real-time lighting commands received via OSC to the DMX hardware."
  4
  5import argparse
  6import time
  7import math
  8import logging
  9import threading
 10
 11import numpy as np
 12
 13# import the pythonosc package
 14from pythonosc import dispatcher
 15from pythonosc import osc_server
 16
 17# import the DMXUSBPro interface
 18from theater import dmxusbpro
 19
 20# OSC port for lighting commands
 21from theater.config import lighting_UDP_port, lighting_host_IP, fixtures
 22
 23# import common logging functions
 24import theater.logging
 25
 26# initialize logging for this module
 27log = logging.getLogger('lighting')
 28
 29#================================================================
 30class FixtureSpline:
 31    """Representation of a single DMX lighting fixture animated using cubic
 32    Bezier splines.  For dimmer packs this is typically a single channel
 33    representing a single monochrome lamp, for RGB or RGBA fixtures this
 34    operates three or four channels.  This stores DMX channel assignments for
 35    convenience but uses an external object to write the current universe.
 36    """
 37    def __init__(self, channels=1, DMX_base=0):
 38
 39        # save the DMX address of the first channel
 40        self.DMX_base = DMX_base
 41
 42        # Initialize a default spline buffer.  The invariant is that this will
 43        # always retain at least one segment.
 44        self.knots = np.zeros((4, channels))
 45        self.u     = 1.0
 46        self.rate  = 1.0   # execution rate in segments per sec
 47
 48        # Current interpolated value.
 49        self.value = np.zeros((channels))
 50
 51    def set_tempo(self, spm):
 52        """Set the execution rate of the spline interpolator in units of
 53        segments/minute.  The initial default value is 60 segs/min (1 per
 54        second)."""
 55        self.rate = max(min(spm/60.0, 5), 0.0001)
 56
 57    def channels(self):
 58        """Return the number of channels required by the fixture.  E.g., each
 59        dimmer pack output is one channel, an RGB fixture has three, an RGBW
 60        fixture has four."""
 61        return self.knots.shape[1]
 62
 63    def update(self, dt, dmx):
 64        """Advance the spline position given the elapsed interval dt (in
 65        seconds), recalculate the spline interpolation, and send data to the
 66        given dmx universe object.
 67        """
 68        if dt > 0.0:
 69            self.u += self.rate * dt
 70            #  Determine whether to roll over to the next segment or not.  This will
 71            #  always leave one segment remaining.
 72            segs = (self.knots.shape[0]-1) // 3
 73            if segs > 1:
 74                if self.u >= 1.0:
 75                    # u has advanced to next segment, drop the first segment by dropping the first three knots
 76                    self.knots = self.knots[3:]
 77                    self.u -= 1.0
 78
 79        # always limit u to the first active segment
 80        self.u = min(max(self.u, 0.0), 1.0)
 81
 82        # update interpolation
 83        self._recalculate()
 84
 85        # update the DMX universe
 86        num_channels = self.knots.shape[1]
 87        dmx.universe[self.DMX_base:self.DMX_base+num_channels] = np.minimum(np.maximum(self.value, 0), 255).astype(np.uint8)
 88
 89    def _recalculate(self):
 90        """Apply the De Casteljau algorithm to calculate the position of
 91        the spline at the current path point."""
 92
 93        # unroll the De Casteljau iteration and save intermediate points
 94        num_channels = self.knots.shape[1]
 95        beta = np.ndarray((3, num_channels))
 96
 97        # note: could use more numpy optimizations
 98        u = self.u
 99        for k in range(3):
100            beta[k] = self.knots[k] * (1 - u) + self.knots[k+1] * u
101
102        for k in range(2):
103            beta[k] =  beta[k] * (1 - u) +  beta[k+1] * u;
104
105        self.value[:] = beta[0] * (1 - u) + beta[1] * u
106
107    def set_target(self, value):
108        """Reset the path generator to a single spline segment from the current
109        value to the given value.  The path defaults to 'ease-out', meaning the
110        rate of change is discontinuous at the start and slows into the target
111
112        :param value: iterable with one DMX value per channel"""
113        self.knots[0,:] = self.value   # start at the current levels
114        self.knots[1,:] = value        # derivative discontinuity
115        self.knots[2,:] = value        # ease at the target
116        self.knots[3,:] = value        # target endpoint
117
118        # reset the path position to start the new segment
119        self.u = 0.0;
120
121    def set_value(self, value):
122        """Reset the path generator immediately to the given value with no
123        interpolation.
124
125        :param value: iterable with one DMX value per channel"""
126
127        self.knots[0,:] = value
128        self.knots[1,:] = value
129        self.knots[2,:] = value
130        self.knots[3,:] = value
131
132        # reset the path position to start the new segment
133        self.u = 1.0;
134
135        # reset current value
136        self.value[:] = value
137
138    def add_spline(self, knots):
139        """Append a sequence of Bezier spline segments to the lighting sequence
140        generator, provided as a numpy matrix with three rows per segment and
141        one column per channel.  The values are treated as DMX command values,
142        and may stray outside 0-255 even though the interpolated output will be
143        clamped."""
144
145        self.knots = np.vstack((self.knots, knots))
146
147        # Note: the path position and current value are left unchanged.  If the
148        # interpolator is at the end of the current segment, it will now begin
149        # to advance into these new segments.
150        return
151
152#================================================================
153class EventLoop:
154    def __init__(self, args):
155
156        # configure event timing
157        self.dt_ns = 40*1000*1000  # 25 Hz in nanoseconds per cycle
158
159        # Keep usage counts for logging.
160        self.spline_msg_count = 0
161        self.fixture_msg_count = 0
162        self.tempo_msg_count = 0
163
164        # Create the networking listener
165        self.init_networking(args)
166
167        # Open the DMX controller serial port
168        log.info("Opening DMX serial port.")
169        self.dmx = dmxusbpro.DMXUSBPro(port=args.dmx, verbose=args.verbose, debug=args.debug)
170        self.dmx.open_serial_port()
171
172        # Create fade interpolators. The lock is used to synchronize access between
173        # the networking thread and the main event loop thread.
174        self.array_lock = threading.Lock()
175
176        # Use the theater.config.fixtures array to configure the spline interpolators.
177        self.splines = []
178        self.fixture_map = {}
179        for i, f in enumerate(fixtures):
180            log.debug("Found fixture %d: %s", i, f)
181            name     = f.get('name')
182            base     = f.get('dmx')
183            channels = f.get('channels')
184            interpolator = FixtureSpline(channels=channels, DMX_base=base)
185            self.splines.append(interpolator)
186            self.fixture_map[name] = interpolator
187
188        self.num_channels = len(self.splines)
189
190        return
191    #---------------------------------------------------------------
192    def set_targets(self, values):
193        """Set all spline interpolation targets."""
194        with self.array_lock:
195            for i, value in enumerate(values):
196                self.splines[i].set_target([value])
197
198    def set_values(self, values):
199        """Immediately set all spline interpolor values."""
200        with self.array_lock:
201            for i, value in enumerate(values):
202                self.splines[i].set_value([value])
203
204    #---------------------------------------------------------------
205    def close(self):
206        log.info("Closing serial ports.")
207        self.dmx.close_serial_port()
208
209    #---------------------------------------------------------------
210    def init_networking(self, args):
211        # Initialize the OSC message dispatch system.
212        self.dispatch = dispatcher.Dispatcher()
213        self.dispatch.map("/fixture",   self.lights_fixture)
214        self.dispatch.map("/spline",    self.lights_spline)
215        self.dispatch.map("/tempo",     self.lights_tempo)
216
217        self.dispatch.set_default_handler(self.unknown_message)
218
219        # Start and run the server.
220        self.server = osc_server.OSCUDPServer((args.ip, lighting_UDP_port), self.dispatch)
221        self.server_thread = threading.Thread(target=self.server.serve_forever)
222        self.server_thread.daemon = True
223        self.server_thread.start()
224        log.info("started OSC server on port %s:%d", args.ip, lighting_UDP_port)
225
226    def unknown_message(self, msgaddr, *args):
227        """Default handler for unrecognized OSC messages."""
228        log.warning("Received unmapped OSC message %s: %s", msgaddr, args)
229
230    def lights_fixture(self, msgaddr, *args):
231        """Process /fixture messages received via OSC over UDP.  The first value
232        is treated as a fixture index or name, followed by one or more DMX
233        values specifying the target level or color.  The fixture starts a
234        smooth transition to the specified value.
235        """
236
237        try:
238            values = np.array(args[1:])
239            values = np.minimum(np.maximum(values, 0), 255)
240
241            # first try the fixture argument as an identifier
242            fixture = args[0]
243            interpolator = self.fixture_map.get(fixture)
244            if interpolator is None:
245                # else try the fixture argument as a numerical index
246                fixture = int(fixture)
247                interpolator = self.splines[fixture]
248
249            with self.array_lock:
250                interpolator.set_target(values)
251
252            self.fixture_msg_count += 1
253
254        except:
255            log.warning("OSC message failed for /fixture: %s", args)
256
257    def lights_spline(self, msgaddr, *args):
258        """Process /spline messages received via OSC over UDP.  The first
259        message value specifies a fixture number, followed by a row-major array
260        representing a matrix of Bezier spline knots.  The matrix has one column
261        per fixture channel and three rows per spline segment, and specifies DMX
262        output values.  The implicit first knot is the current fixture state.
263        """
264        try:
265            # first try the fixture argument as an identifier
266            fixture = args[0]
267            interpolator = self.fixture_map.get(fixture)
268            if interpolator is None:
269                # else try the fixture argument as a numerical index
270                fixture = int(fixture)
271                interpolator = self.splines[fixture]
272
273            # calculate the inferred matrix properties, ignoring excess values
274            channels     = interpolator.channels()
275
276            num_values   = len(args[1:])
277            num_rows     = num_values // channels
278            num_segments = num_rows // 3
279            num_used     = 3*num_segments*channels
280
281            # convert the usable args into a knot matrix
282            knots        = np.array(args[1:1+num_used]).reshape(3*num_segments, channels)
283
284            # and add to the fixture lighting trajectory
285            with self.array_lock:
286                interpolator.add_spline(knots)
287
288            self.spline_msg_count += 1
289
290        except:
291            log.warning("error processing OSC spline message: %s", args)
292
293    def lights_tempo(self, msgaddr, *args):
294        """Process /tempo messages received via OSC over UDP.  The first message
295        value specifies a fixture, the second a spline execution rate in
296        segments per minute.
297        """
298        try:
299            # first try the fixture argument as an identifier
300            fixture = args[0]
301            interpolator = self.fixture_map.get(fixture)
302            if interpolator is None:
303                # else try the fixture argument as a numerical index
304                fixture = int(fixture)
305                interpolator = self.splines[fixture]
306
307            tempo = float(args[1])
308            interpolator.set_tempo(tempo)
309
310            self.tempo_msg_count += 1
311
312        except:
313            log.warning("error processing OSC tempo message: %s", args)
314
315    #---------------------------------------------------------------
316    # Event loop.
317    def run(self):
318        """Run the lighting server event loop to periodically update the fade
319        filters and retransmit the DMX universe to the hardware.  OSC UDP
320        network messages are received by a background thread and applied to the
321        target and intensity arrays to be processed by this loop."""
322
323        start_t = time.monotonic_ns()
324        next_cycle_t = start_t + self.dt_ns
325        event_cycles = 0
326
327        while True:
328            # wait for the next cycle timepoint, keeping the long
329            # term rate stable even if the short term timing jitters
330            now_ns = time.monotonic_ns()
331            delay = max(next_cycle_t - now_ns, 0)
332            if (delay > 0):
333                time.sleep(delay * 1e-9)
334            next_cycle_t += self.dt_ns
335            now_ns = time.monotonic_ns()
336            now_seconds = (now_ns - start_t)*1e-9
337
338            # apply first-order smoothing to the lighting commands
339
340            with self.array_lock:
341                for spline in self.splines:
342                    spline.update(self.dt_ns * 1e-9, self.dmx)
343
344            # and send to the hardware
345            self.dmx.send_universe()
346
347            # the following is not necessary given the output-only usage of the
348            # Enttec device, and also appears to be triggering a FTDI driver bug on
349            # recent versions of macOS:
350            # self.dmx.flush_serial_input()
351
352            # periodically log status
353            if (event_cycles % (25*60*5)) == 0:
354                with self.array_lock:
355                    log.debug("event cycle %d: fixture msgs: %d, spline msgs: %d, tempo msgs: %d",
356                              event_cycles,
357                              self.fixture_msg_count,
358                              self.spline_msg_count,
359                              self.tempo_msg_count)
360            event_cycles += 1
361
362#================================================================
363# The following section is run when this is loaded as a script.
364if __name__ == "__main__":
365
366    # set up logging
367    theater.logging.open_log_file('logs/lighting-server.log')
368    log.info("Starting lighting-server.py")
369    np.set_printoptions(linewidth=np.inf)
370
371    # Initialize the command parser.
372    parser = argparse.ArgumentParser(description = cmd_desc)
373    parser.add_argument( '--ip', default=lighting_host_IP,  help="Network interface address (default: %(default)s).")
374    parser.add_argument( '--dmx', default='/dev/ttyUSB0', help='DMX serial port device (default: %(default)s).')
375    theater.logging.add_logging_args(parser)
376
377    # Parse the command line, returning a Namespace.
378    args = parser.parse_args()
379
380    # Modify logging settings as per common arguments.
381    theater.logging.configure_logging(args)
382
383    # Create the event loop.
384    event_loop = EventLoop(args)
385
386    # Begin the performance.  This may be safely interrupted by the user pressing Control-C.
387    try:
388        event_loop.run()
389
390    except KeyboardInterrupt:
391        log.info("User interrupted operation.")
392        print("User interrupt, shutting down.")
393        event_loop.close()
394
395    log.info("Exiting lighting-server.py")