Theater Lighting Server

The following script implements an OSC network server and cubic spline interpolator which generates output to a set of DMX fixtures. The hardware serial port I/O is handled by Theater DMX Host Interface.

  1#!/usr/bin/env python3
  2
  3# The best way to start this server is from the parent folder using:
  4#   python3 -m stage.lighting_server --quiet
  5
  6cmd_desc = "Communicate real-time lighting commands received via OSC to the DMX hardware."
  7
  8import argparse
  9import time
 10import math
 11import logging
 12import threading
 13
 14import numpy as np
 15
 16# import the pythonosc package
 17from pythonosc import dispatcher
 18from pythonosc import osc_server
 19
 20# import the DMXUSBPro interface
 21from . import dmxusbpro
 22
 23# OSC port for lighting commands
 24from .config import lighting_UDP_port, lighting_host_IP, fixtures
 25
 26# common logging functions
 27from .logconfig import add_logging_args, open_log_file, configure_logging
 28
 29# initialize logging for this module
 30log = logging.getLogger('lighting')
 31
 32#================================================================
 33class FixtureSpline:
 34    """Representation of a single DMX lighting fixture animated using cubic
 35    Bezier splines.  For dimmer packs this is typically a single channel
 36    representing a single monochrome lamp, for RGB or RGBA fixtures this
 37    operates three or four channels.  This stores DMX channel assignments for
 38    convenience but uses an external object to write the current universe.
 39    """
 40    def __init__(self, channels=1, DMX_base=0):
 41
 42        # save the DMX address of the first channel
 43        self.DMX_base = DMX_base
 44
 45        # Initialize a default spline buffer.  The invariant is that this will
 46        # always retain at least one segment.
 47        self.knots = np.zeros((4, channels))
 48        self.u     = 1.0
 49        self.rate  = 1.0   # execution rate in segments per sec
 50
 51        # Current interpolated value.
 52        self.value = np.zeros((channels))
 53
 54    def set_tempo(self, spm):
 55        """Set the execution rate of the spline interpolator in units of
 56        segments/minute.  The initial default value is 60 segs/min (1 per
 57        second)."""
 58        self.rate = max(min(spm/60.0, 5), 0.0001)
 59
 60    def channels(self):
 61        """Return the number of channels required by the fixture.  E.g., each
 62        dimmer pack output is one channel, an RGB fixture has three, an RGBW
 63        fixture has four."""
 64        return self.knots.shape[1]
 65
 66    def update(self, dt, dmx):
 67        """Advance the spline position given the elapsed interval dt (in
 68        seconds), recalculate the spline interpolation, and send data to the
 69        given dmx universe object.
 70        """
 71        if dt > 0.0:
 72            self.u += self.rate * dt
 73            #  Determine whether to roll over to the next segment or not.  This will
 74            #  always leave one segment remaining.
 75            segs = (self.knots.shape[0]-1) // 3
 76            if segs > 1:
 77                if self.u >= 1.0:
 78                    # u has advanced to next segment, drop the first segment by dropping the first three knots
 79                    self.knots = self.knots[3:]
 80                    self.u -= 1.0
 81
 82        # always limit u to the first active segment
 83        self.u = min(max(self.u, 0.0), 1.0)
 84
 85        # update interpolation
 86        self._recalculate()
 87
 88        # update the DMX universe
 89        num_channels = self.knots.shape[1]
 90        dmx.universe[self.DMX_base:self.DMX_base+num_channels] = np.minimum(np.maximum(self.value, 0), 255).astype(np.uint8)
 91
 92    def _recalculate(self):
 93        """Apply the De Casteljau algorithm to calculate the position of
 94        the spline at the current path point."""
 95
 96        # unroll the De Casteljau iteration and save intermediate points
 97        num_channels = self.knots.shape[1]
 98        beta = np.ndarray((3, num_channels))
 99
100        # note: could use more numpy optimizations
101        u = self.u
102        for k in range(3):
103            beta[k] = self.knots[k] * (1 - u) + self.knots[k+1] * u
104
105        for k in range(2):
106            beta[k] =  beta[k] * (1 - u) +  beta[k+1] * u;
107
108        self.value[:] = beta[0] * (1 - u) + beta[1] * u
109
110    def set_target(self, value):
111        """Reset the path generator to a single spline segment from the current
112        value to the given value.  The path defaults to 'ease-out', meaning the
113        rate of change is discontinuous at the start and slows into the target
114
115        :param value: iterable with one DMX value per channel"""
116        self.knots[0,:] = self.value   # start at the current levels
117        self.knots[1,:] = value        # derivative discontinuity
118        self.knots[2,:] = value        # ease at the target
119        self.knots[3,:] = value        # target endpoint
120
121        # reset the path position to start the new segment
122        self.u = 0.0;
123
124    def set_value(self, value):
125        """Reset the path generator immediately to the given value with no
126        interpolation.
127
128        :param value: iterable with one DMX value per channel"""
129
130        self.knots[0,:] = value
131        self.knots[1,:] = value
132        self.knots[2,:] = value
133        self.knots[3,:] = value
134
135        # reset the path position to start the new segment
136        self.u = 1.0;
137
138        # reset current value
139        self.value[:] = value
140
141    def add_spline(self, knots):
142        """Append a sequence of Bezier spline segments to the lighting sequence
143        generator, provided as a numpy matrix with three rows per segment and
144        one column per channel.  The values are treated as DMX command values,
145        and may stray outside 0-255 even though the interpolated output will be
146        clamped."""
147
148        self.knots = np.vstack((self.knots, knots))
149
150        # Note: the path position and current value are left unchanged.  If the
151        # interpolator is at the end of the current segment, it will now begin
152        # to advance into these new segments.
153        return
154
155#================================================================
156class EventLoop:
157    def __init__(self, args):
158
159        # configure event timing
160        self.dt_ns = 40*1000*1000  # 25 Hz in nanoseconds per cycle
161
162        # Keep usage counts for logging.
163        self.spline_msg_count = 0
164        self.fixture_msg_count = 0
165        self.tempo_msg_count = 0
166
167        # Create the networking listener
168        self.init_networking(args)
169
170        # Open the DMX controller serial port
171        log.info("Opening DMX serial port.")
172        self.dmx = dmxusbpro.DMXUSBPro(port=args.dmx, verbose=args.verbose, debug=args.debug)
173        self.dmx.open_serial_port()
174
175        # Create fade interpolators. The lock is used to synchronize access between
176        # the networking thread and the main event loop thread.
177        self.array_lock = threading.Lock()
178
179        # Use the stage.config.fixtures array to configure the spline interpolators.
180        self.splines = []
181        self.fixture_map = {}
182        for i, f in enumerate(fixtures):
183            log.debug("Found fixture %d: %s", i, f)
184            name     = f.get('name')
185            base     = f.get('dmx')
186            channels = f.get('channels')
187            interpolator = FixtureSpline(channels=channels, DMX_base=base)
188            self.splines.append(interpolator)
189            self.fixture_map[name] = interpolator
190
191        self.num_channels = len(self.splines)
192
193        return
194    #---------------------------------------------------------------
195    def set_targets(self, values):
196        """Set all spline interpolation targets."""
197        with self.array_lock:
198            for i, value in enumerate(values):
199                self.splines[i].set_target([value])
200
201    def set_values(self, values):
202        """Immediately set all spline interpolor values."""
203        with self.array_lock:
204            for i, value in enumerate(values):
205                self.splines[i].set_value([value])
206
207    #---------------------------------------------------------------
208    def close(self):
209        log.info("Closing serial ports.")
210        self.dmx.close_serial_port()
211
212    #---------------------------------------------------------------
213    def init_networking(self, args):
214        # Initialize the OSC message dispatch system.
215        self.dispatch = dispatcher.Dispatcher()
216        self.dispatch.map("/fixture",   self.lights_fixture)
217        self.dispatch.map("/spline",    self.lights_spline)
218        self.dispatch.map("/tempo",     self.lights_tempo)
219
220        self.dispatch.set_default_handler(self.unknown_message)
221
222        # Start and run the server.
223        self.server = osc_server.OSCUDPServer((args.ip, lighting_UDP_port), self.dispatch)
224        self.server_thread = threading.Thread(target=self.server.serve_forever)
225        self.server_thread.daemon = True
226        self.server_thread.start()
227        log.info("started OSC server on port %s:%d", args.ip, lighting_UDP_port)
228
229    def unknown_message(self, msgaddr, *args):
230        """Default handler for unrecognized OSC messages."""
231        log.warning("Received unmapped OSC message %s: %s", msgaddr, args)
232
233    def lights_fixture(self, msgaddr, *args):
234        """Process /fixture messages received via OSC over UDP.  The first value
235        is treated as a fixture index or name, followed by one or more DMX
236        values specifying the target level or color.  The fixture starts a
237        smooth transition to the specified value.
238        """
239
240        try:
241            values = np.array(args[1:])
242            values = np.minimum(np.maximum(values, 0), 255)
243
244            # first try the fixture argument as an identifier
245            fixture = args[0]
246            interpolator = self.fixture_map.get(fixture)
247            if interpolator is None:
248                # else try the fixture argument as a numerical index
249                fixture = int(fixture)
250                interpolator = self.splines[fixture]
251
252            with self.array_lock:
253                interpolator.set_target(values)
254
255            self.fixture_msg_count += 1
256
257        except:
258            log.warning("OSC message failed for /fixture: %s", args)
259
260    def lights_spline(self, msgaddr, *args):
261        """Process /spline messages received via OSC over UDP.  The first
262        message value specifies a fixture number, followed by a row-major array
263        representing a matrix of Bezier spline knots.  The matrix has one column
264        per fixture channel and three rows per spline segment, and specifies DMX
265        output values.  The implicit first knot is the current fixture state.
266        """
267        try:
268            # first try the fixture argument as an identifier
269            fixture = args[0]
270            interpolator = self.fixture_map.get(fixture)
271            if interpolator is None:
272                # else try the fixture argument as a numerical index
273                fixture = int(fixture)
274                interpolator = self.splines[fixture]
275
276            # calculate the inferred matrix properties, ignoring excess values
277            channels     = interpolator.channels()
278
279            num_values   = len(args[1:])
280            num_rows     = num_values // channels
281            num_segments = num_rows // 3
282            num_used     = 3*num_segments*channels
283
284            # convert the usable args into a knot matrix
285            knots        = np.array(args[1:1+num_used]).reshape(3*num_segments, channels)
286
287            # and add to the fixture lighting trajectory
288            with self.array_lock:
289                interpolator.add_spline(knots)
290
291            self.spline_msg_count += 1
292
293        except:
294            log.warning("error processing OSC spline message: %s", args)
295
296    def lights_tempo(self, msgaddr, *args):
297        """Process /tempo messages received via OSC over UDP.  The first message
298        value specifies a fixture, the second a spline execution rate in
299        segments per minute.
300        """
301        try:
302            # first try the fixture argument as an identifier
303            fixture = args[0]
304            interpolator = self.fixture_map.get(fixture)
305            if interpolator is None:
306                # else try the fixture argument as a numerical index
307                fixture = int(fixture)
308                interpolator = self.splines[fixture]
309
310            tempo = float(args[1])
311            interpolator.set_tempo(tempo)
312
313            self.tempo_msg_count += 1
314
315        except:
316            log.warning("error processing OSC tempo message: %s", args)
317
318    #---------------------------------------------------------------
319    # Event loop.
320    def run(self):
321        """Run the lighting server event loop to periodically update the fade
322        filters and retransmit the DMX universe to the hardware.  OSC UDP
323        network messages are received by a background thread and applied to the
324        target and intensity arrays to be processed by this loop."""
325
326        start_t = time.monotonic_ns()
327        next_cycle_t = start_t + self.dt_ns
328        event_cycles = 0
329
330        while True:
331            # wait for the next cycle timepoint, keeping the long
332            # term rate stable even if the short term timing jitters
333            now_ns = time.monotonic_ns()
334            delay = max(next_cycle_t - now_ns, 0)
335            if (delay > 0):
336                time.sleep(delay * 1e-9)
337            next_cycle_t += self.dt_ns
338            now_ns = time.monotonic_ns()
339            now_seconds = (now_ns - start_t)*1e-9
340
341            # apply first-order smoothing to the lighting commands
342
343            with self.array_lock:
344                for spline in self.splines:
345                    spline.update(self.dt_ns * 1e-9, self.dmx)
346
347            # and send to the hardware
348            self.dmx.send_universe()
349
350            # the following is not necessary given the output-only usage of the
351            # Enttec device, and also appears to be triggering a FTDI driver bug on
352            # recent versions of macOS:
353            # self.dmx.flush_serial_input()
354
355            # periodically log status
356            if (event_cycles % (25*60*5)) == 0:
357                with self.array_lock:
358                    log.debug("event cycle %d: fixture msgs: %d, spline msgs: %d, tempo msgs: %d",
359                              event_cycles,
360                              self.fixture_msg_count,
361                              self.spline_msg_count,
362                              self.tempo_msg_count)
363            event_cycles += 1
364
365#================================================================
366# The following section is run when this is loaded as a script.
367if __name__ == "__main__":
368
369    # set up logging
370    open_log_file('logs/lighting-server.log')
371    log.info("Starting lighting-server.py")
372    np.set_printoptions(linewidth=np.inf)
373
374    # Initialize the command parser.
375    parser = argparse.ArgumentParser(description = cmd_desc)
376    parser.add_argument( '--ip', default=lighting_host_IP,  help="Network interface address (default: %(default)s).")
377    parser.add_argument( '--dmx', default='/dev/ttyUSB0', help='DMX serial port device (default: %(default)s).')
378    add_logging_args(parser)
379
380    # Parse the command line, returning a Namespace.
381    args = parser.parse_args()
382
383    # Modify logging settings as per common arguments.
384    configure_logging(args)
385
386    # Create the event loop.
387    event_loop = EventLoop(args)
388
389    # Begin the performance.  This may be safely interrupted by the user pressing Control-C.
390    try:
391        event_loop.run()
392
393    except KeyboardInterrupt:
394        log.info("User interrupted operation.")
395        print("User interrupt, shutting down.")
396        event_loop.close()
397
398    log.info("Exiting lighting-server.py")