Theater Lighting Server¶
The following script implements an OSC network server and cubic spline interpolator which generates output to a set of DMX fixtures. The hardware serial port I/O is handled by Theater DMX Host Interface.
1#!/usr/bin/env python3
2
3cmd_desc = "Communicate real-time lighting commands received via OSC to the DMX hardware."
4
5import argparse
6import time
7import math
8import logging
9import threading
10
11import numpy as np
12
13# import the pythonosc package
14from pythonosc import dispatcher
15from pythonosc import osc_server
16
17# import the DMXUSBPro interface
18from theater import dmxusbpro
19
20# OSC port for lighting commands
21from theater.config import lighting_UDP_port, lighting_host_IP, fixtures
22
23# import common logging functions
24import theater.logging
25
26# initialize logging for this module
27log = logging.getLogger('lighting')
28
29#================================================================
30class FixtureSpline:
31 """Representation of a single DMX lighting fixture animated using cubic
32 Bezier splines. For dimmer packs this is typically a single channel
33 representing a single monochrome lamp, for RGB or RGBA fixtures this
34 operates three or four channels. This stores DMX channel assignments for
35 convenience but uses an external object to write the current universe.
36 """
37 def __init__(self, channels=1, DMX_base=0):
38
39 # save the DMX address of the first channel
40 self.DMX_base = DMX_base
41
42 # Initialize a default spline buffer. The invariant is that this will
43 # always retain at least one segment.
44 self.knots = np.zeros((4, channels))
45 self.u = 1.0
46 self.rate = 1.0 # execution rate in segments per sec
47
48 # Current interpolated value.
49 self.value = np.zeros((channels))
50
51 def set_tempo(self, spm):
52 """Set the execution rate of the spline interpolator in units of
53 segments/minute. The initial default value is 60 segs/min (1 per
54 second)."""
55 self.rate = max(min(spm/60.0, 5), 0.0001)
56
57 def channels(self):
58 """Return the number of channels required by the fixture. E.g., each
59 dimmer pack output is one channel, an RGB fixture has three, an RGBW
60 fixture has four."""
61 return self.knots.shape[1]
62
63 def update(self, dt, dmx):
64 """Advance the spline position given the elapsed interval dt (in
65 seconds), recalculate the spline interpolation, and send data to the
66 given dmx universe object.
67 """
68 if dt > 0.0:
69 self.u += self.rate * dt
70 # Determine whether to roll over to the next segment or not. This will
71 # always leave one segment remaining.
72 segs = (self.knots.shape[0]-1) // 3
73 if segs > 1:
74 if self.u >= 1.0:
75 # u has advanced to next segment, drop the first segment by dropping the first three knots
76 self.knots = self.knots[3:]
77 self.u -= 1.0
78
79 # always limit u to the first active segment
80 self.u = min(max(self.u, 0.0), 1.0)
81
82 # update interpolation
83 self._recalculate()
84
85 # update the DMX universe
86 num_channels = self.knots.shape[1]
87 dmx.universe[self.DMX_base:self.DMX_base+num_channels] = np.minimum(np.maximum(self.value, 0), 255).astype(np.uint8)
88
89 def _recalculate(self):
90 """Apply the De Casteljau algorithm to calculate the position of
91 the spline at the current path point."""
92
93 # unroll the De Casteljau iteration and save intermediate points
94 num_channels = self.knots.shape[1]
95 beta = np.ndarray((3, num_channels))
96
97 # note: could use more numpy optimizations
98 u = self.u
99 for k in range(3):
100 beta[k] = self.knots[k] * (1 - u) + self.knots[k+1] * u
101
102 for k in range(2):
103 beta[k] = beta[k] * (1 - u) + beta[k+1] * u;
104
105 self.value[:] = beta[0] * (1 - u) + beta[1] * u
106
107 def set_target(self, value):
108 """Reset the path generator to a single spline segment from the current
109 value to the given value. The path defaults to 'ease-out', meaning the
110 rate of change is discontinuous at the start and slows into the target
111
112 :param value: iterable with one DMX value per channel"""
113 self.knots[0,:] = self.value # start at the current levels
114 self.knots[1,:] = value # derivative discontinuity
115 self.knots[2,:] = value # ease at the target
116 self.knots[3,:] = value # target endpoint
117
118 # reset the path position to start the new segment
119 self.u = 0.0;
120
121 def set_value(self, value):
122 """Reset the path generator immediately to the given value with no
123 interpolation.
124
125 :param value: iterable with one DMX value per channel"""
126
127 self.knots[0,:] = value
128 self.knots[1,:] = value
129 self.knots[2,:] = value
130 self.knots[3,:] = value
131
132 # reset the path position to start the new segment
133 self.u = 1.0;
134
135 # reset current value
136 self.value[:] = value
137
138 def add_spline(self, knots):
139 """Append a sequence of Bezier spline segments to the lighting sequence
140 generator, provided as a numpy matrix with three rows per segment and
141 one column per channel. The values are treated as DMX command values,
142 and may stray outside 0-255 even though the interpolated output will be
143 clamped."""
144
145 self.knots = np.vstack((self.knots, knots))
146
147 # Note: the path position and current value are left unchanged. If the
148 # interpolator is at the end of the current segment, it will now begin
149 # to advance into these new segments.
150 return
151
152#================================================================
153class EventLoop:
154 def __init__(self, args):
155
156 # configure event timing
157 self.dt_ns = 40*1000*1000 # 25 Hz in nanoseconds per cycle
158
159 # Keep usage counts for logging.
160 self.spline_msg_count = 0
161 self.fixture_msg_count = 0
162 self.tempo_msg_count = 0
163
164 # Create the networking listener
165 self.init_networking(args)
166
167 # Open the DMX controller serial port
168 log.info("Opening DMX serial port.")
169 self.dmx = dmxusbpro.DMXUSBPro(port=args.dmx, verbose=args.verbose, debug=args.debug)
170 self.dmx.open_serial_port()
171
172 # Create fade interpolators. The lock is used to synchronize access between
173 # the networking thread and the main event loop thread.
174 self.array_lock = threading.Lock()
175
176 # Use the theater.config.fixtures array to configure the spline interpolators.
177 self.splines = []
178 self.fixture_map = {}
179 for i, f in enumerate(fixtures):
180 log.debug("Found fixture %d: %s", i, f)
181 name = f.get('name')
182 base = f.get('dmx')
183 channels = f.get('channels')
184 interpolator = FixtureSpline(channels=channels, DMX_base=base)
185 self.splines.append(interpolator)
186 self.fixture_map[name] = interpolator
187
188 self.num_channels = len(self.splines)
189
190 return
191 #---------------------------------------------------------------
192 def set_targets(self, values):
193 """Set all spline interpolation targets."""
194 with self.array_lock:
195 for i, value in enumerate(values):
196 self.splines[i].set_target([value])
197
198 def set_values(self, values):
199 """Immediately set all spline interpolor values."""
200 with self.array_lock:
201 for i, value in enumerate(values):
202 self.splines[i].set_value([value])
203
204 #---------------------------------------------------------------
205 def close(self):
206 log.info("Closing serial ports.")
207 self.dmx.close_serial_port()
208
209 #---------------------------------------------------------------
210 def init_networking(self, args):
211 # Initialize the OSC message dispatch system.
212 self.dispatch = dispatcher.Dispatcher()
213 self.dispatch.map("/fixture", self.lights_fixture)
214 self.dispatch.map("/spline", self.lights_spline)
215 self.dispatch.map("/tempo", self.lights_tempo)
216
217 self.dispatch.set_default_handler(self.unknown_message)
218
219 # Start and run the server.
220 self.server = osc_server.OSCUDPServer((args.ip, lighting_UDP_port), self.dispatch)
221 self.server_thread = threading.Thread(target=self.server.serve_forever)
222 self.server_thread.daemon = True
223 self.server_thread.start()
224 log.info("started OSC server on port %s:%d", args.ip, lighting_UDP_port)
225
226 def unknown_message(self, msgaddr, *args):
227 """Default handler for unrecognized OSC messages."""
228 log.warning("Received unmapped OSC message %s: %s", msgaddr, args)
229
230 def lights_fixture(self, msgaddr, *args):
231 """Process /fixture messages received via OSC over UDP. The first value
232 is treated as a fixture index or name, followed by one or more DMX
233 values specifying the target level or color. The fixture starts a
234 smooth transition to the specified value.
235 """
236
237 try:
238 values = np.array(args[1:])
239 values = np.minimum(np.maximum(values, 0), 255)
240
241 # first try the fixture argument as an identifier
242 fixture = args[0]
243 interpolator = self.fixture_map.get(fixture)
244 if interpolator is None:
245 # else try the fixture argument as a numerical index
246 fixture = int(fixture)
247 interpolator = self.splines[fixture]
248
249 with self.array_lock:
250 interpolator.set_target(values)
251
252 self.fixture_msg_count += 1
253
254 except:
255 log.warning("OSC message failed for /fixture: %s", args)
256
257 def lights_spline(self, msgaddr, *args):
258 """Process /spline messages received via OSC over UDP. The first
259 message value specifies a fixture number, followed by a row-major array
260 representing a matrix of Bezier spline knots. The matrix has one column
261 per fixture channel and three rows per spline segment, and specifies DMX
262 output values. The implicit first knot is the current fixture state.
263 """
264 try:
265 # first try the fixture argument as an identifier
266 fixture = args[0]
267 interpolator = self.fixture_map.get(fixture)
268 if interpolator is None:
269 # else try the fixture argument as a numerical index
270 fixture = int(fixture)
271 interpolator = self.splines[fixture]
272
273 # calculate the inferred matrix properties, ignoring excess values
274 channels = interpolator.channels()
275
276 num_values = len(args[1:])
277 num_rows = num_values // channels
278 num_segments = num_rows // 3
279 num_used = 3*num_segments*channels
280
281 # convert the usable args into a knot matrix
282 knots = np.array(args[1:1+num_used]).reshape(3*num_segments, channels)
283
284 # and add to the fixture lighting trajectory
285 with self.array_lock:
286 interpolator.add_spline(knots)
287
288 self.spline_msg_count += 1
289
290 except:
291 log.warning("error processing OSC spline message: %s", args)
292
293 def lights_tempo(self, msgaddr, *args):
294 """Process /tempo messages received via OSC over UDP. The first message
295 value specifies a fixture, the second a spline execution rate in
296 segments per minute.
297 """
298 try:
299 # first try the fixture argument as an identifier
300 fixture = args[0]
301 interpolator = self.fixture_map.get(fixture)
302 if interpolator is None:
303 # else try the fixture argument as a numerical index
304 fixture = int(fixture)
305 interpolator = self.splines[fixture]
306
307 tempo = float(args[1])
308 interpolator.set_tempo(tempo)
309
310 self.tempo_msg_count += 1
311
312 except:
313 log.warning("error processing OSC tempo message: %s", args)
314
315 #---------------------------------------------------------------
316 # Event loop.
317 def run(self):
318 """Run the lighting server event loop to periodically update the fade
319 filters and retransmit the DMX universe to the hardware. OSC UDP
320 network messages are received by a background thread and applied to the
321 target and intensity arrays to be processed by this loop."""
322
323 start_t = time.monotonic_ns()
324 next_cycle_t = start_t + self.dt_ns
325 event_cycles = 0
326
327 while True:
328 # wait for the next cycle timepoint, keeping the long
329 # term rate stable even if the short term timing jitters
330 now_ns = time.monotonic_ns()
331 delay = max(next_cycle_t - now_ns, 0)
332 if (delay > 0):
333 time.sleep(delay * 1e-9)
334 next_cycle_t += self.dt_ns
335 now_ns = time.monotonic_ns()
336 now_seconds = (now_ns - start_t)*1e-9
337
338 # apply first-order smoothing to the lighting commands
339
340 with self.array_lock:
341 for spline in self.splines:
342 spline.update(self.dt_ns * 1e-9, self.dmx)
343
344 # and send to the hardware
345 self.dmx.send_universe()
346
347 # the following is not necessary given the output-only usage of the
348 # Enttec device, and also appears to be triggering a FTDI driver bug on
349 # recent versions of macOS:
350 # self.dmx.flush_serial_input()
351
352 # periodically log status
353 if (event_cycles % (25*60*5)) == 0:
354 with self.array_lock:
355 log.debug("event cycle %d: fixture msgs: %d, spline msgs: %d, tempo msgs: %d",
356 event_cycles,
357 self.fixture_msg_count,
358 self.spline_msg_count,
359 self.tempo_msg_count)
360 event_cycles += 1
361
362#================================================================
363# The following section is run when this is loaded as a script.
364if __name__ == "__main__":
365
366 # set up logging
367 theater.logging.open_log_file('logs/lighting-server.log')
368 log.info("Starting lighting-server.py")
369 np.set_printoptions(linewidth=np.inf)
370
371 # Initialize the command parser.
372 parser = argparse.ArgumentParser(description = cmd_desc)
373 parser.add_argument( '--ip', default=lighting_host_IP, help="Network interface address (default: %(default)s).")
374 parser.add_argument( '--dmx', default='/dev/ttyUSB0', help='DMX serial port device (default: %(default)s).')
375 theater.logging.add_logging_args(parser)
376
377 # Parse the command line, returning a Namespace.
378 args = parser.parse_args()
379
380 # Modify logging settings as per common arguments.
381 theater.logging.configure_logging(args)
382
383 # Create the event loop.
384 event_loop = EventLoop(args)
385
386 # Begin the performance. This may be safely interrupted by the user pressing Control-C.
387 try:
388 event_loop.run()
389
390 except KeyboardInterrupt:
391 log.info("User interrupted operation.")
392 print("User interrupt, shutting down.")
393 event_loop.close()
394
395 log.info("Exiting lighting-server.py")