Theater Lighting Server¶
The following script implements an OSC network server and cubic spline interpolator which generates output to a set of DMX fixtures. The hardware serial port I/O is handled by Theater DMX Host Interface.
1#!/usr/bin/env python3
2
3# The best way to start this server is from the parent folder using:
4# python3 -m stage.lighting_server --quiet
5
6cmd_desc = "Communicate real-time lighting commands received via OSC to the DMX hardware."
7
8import argparse
9import time
10import math
11import logging
12import threading
13
14import numpy as np
15
16# import the pythonosc package
17from pythonosc import dispatcher
18from pythonosc import osc_server
19
20# import the DMXUSBPro interface
21from . import dmxusbpro
22
23# OSC port for lighting commands
24from .config import lighting_UDP_port, lighting_host_IP, fixtures
25
26# common logging functions
27from .logconfig import add_logging_args, open_log_file, configure_logging
28
29# initialize logging for this module
30log = logging.getLogger('lighting')
31
32#================================================================
33class FixtureSpline:
34 """Representation of a single DMX lighting fixture animated using cubic
35 Bezier splines. For dimmer packs this is typically a single channel
36 representing a single monochrome lamp, for RGB or RGBA fixtures this
37 operates three or four channels. This stores DMX channel assignments for
38 convenience but uses an external object to write the current universe.
39 """
40 def __init__(self, channels=1, DMX_base=0):
41
42 # save the DMX address of the first channel
43 self.DMX_base = DMX_base
44
45 # Initialize a default spline buffer. The invariant is that this will
46 # always retain at least one segment.
47 self.knots = np.zeros((4, channels))
48 self.u = 1.0
49 self.rate = 1.0 # execution rate in segments per sec
50
51 # Current interpolated value.
52 self.value = np.zeros((channels))
53
54 def set_tempo(self, spm):
55 """Set the execution rate of the spline interpolator in units of
56 segments/minute. The initial default value is 60 segs/min (1 per
57 second)."""
58 self.rate = max(min(spm/60.0, 5), 0.0001)
59
60 def channels(self):
61 """Return the number of channels required by the fixture. E.g., each
62 dimmer pack output is one channel, an RGB fixture has three, an RGBW
63 fixture has four."""
64 return self.knots.shape[1]
65
66 def update(self, dt, dmx):
67 """Advance the spline position given the elapsed interval dt (in
68 seconds), recalculate the spline interpolation, and send data to the
69 given dmx universe object.
70 """
71 if dt > 0.0:
72 self.u += self.rate * dt
73 # Determine whether to roll over to the next segment or not. This will
74 # always leave one segment remaining.
75 segs = (self.knots.shape[0]-1) // 3
76 if segs > 1:
77 if self.u >= 1.0:
78 # u has advanced to next segment, drop the first segment by dropping the first three knots
79 self.knots = self.knots[3:]
80 self.u -= 1.0
81
82 # always limit u to the first active segment
83 self.u = min(max(self.u, 0.0), 1.0)
84
85 # update interpolation
86 self._recalculate()
87
88 # update the DMX universe
89 num_channels = self.knots.shape[1]
90 dmx.universe[self.DMX_base:self.DMX_base+num_channels] = np.minimum(np.maximum(self.value, 0), 255).astype(np.uint8)
91
92 def _recalculate(self):
93 """Apply the De Casteljau algorithm to calculate the position of
94 the spline at the current path point."""
95
96 # unroll the De Casteljau iteration and save intermediate points
97 num_channels = self.knots.shape[1]
98 beta = np.ndarray((3, num_channels))
99
100 # note: could use more numpy optimizations
101 u = self.u
102 for k in range(3):
103 beta[k] = self.knots[k] * (1 - u) + self.knots[k+1] * u
104
105 for k in range(2):
106 beta[k] = beta[k] * (1 - u) + beta[k+1] * u;
107
108 self.value[:] = beta[0] * (1 - u) + beta[1] * u
109
110 def set_target(self, value):
111 """Reset the path generator to a single spline segment from the current
112 value to the given value. The path defaults to 'ease-out', meaning the
113 rate of change is discontinuous at the start and slows into the target
114
115 :param value: iterable with one DMX value per channel"""
116 self.knots[0,:] = self.value # start at the current levels
117 self.knots[1,:] = value # derivative discontinuity
118 self.knots[2,:] = value # ease at the target
119 self.knots[3,:] = value # target endpoint
120
121 # reset the path position to start the new segment
122 self.u = 0.0;
123
124 def set_value(self, value):
125 """Reset the path generator immediately to the given value with no
126 interpolation.
127
128 :param value: iterable with one DMX value per channel"""
129
130 self.knots[0,:] = value
131 self.knots[1,:] = value
132 self.knots[2,:] = value
133 self.knots[3,:] = value
134
135 # reset the path position to start the new segment
136 self.u = 1.0;
137
138 # reset current value
139 self.value[:] = value
140
141 def add_spline(self, knots):
142 """Append a sequence of Bezier spline segments to the lighting sequence
143 generator, provided as a numpy matrix with three rows per segment and
144 one column per channel. The values are treated as DMX command values,
145 and may stray outside 0-255 even though the interpolated output will be
146 clamped."""
147
148 self.knots = np.vstack((self.knots, knots))
149
150 # Note: the path position and current value are left unchanged. If the
151 # interpolator is at the end of the current segment, it will now begin
152 # to advance into these new segments.
153 return
154
155#================================================================
156class EventLoop:
157 def __init__(self, args):
158
159 # configure event timing
160 self.dt_ns = 40*1000*1000 # 25 Hz in nanoseconds per cycle
161
162 # Keep usage counts for logging.
163 self.spline_msg_count = 0
164 self.fixture_msg_count = 0
165 self.tempo_msg_count = 0
166
167 # Create the networking listener
168 self.init_networking(args)
169
170 # Open the DMX controller serial port
171 log.info("Opening DMX serial port.")
172 self.dmx = dmxusbpro.DMXUSBPro(port=args.dmx, verbose=args.verbose, debug=args.debug)
173 self.dmx.open_serial_port()
174
175 # Create fade interpolators. The lock is used to synchronize access between
176 # the networking thread and the main event loop thread.
177 self.array_lock = threading.Lock()
178
179 # Use the stage.config.fixtures array to configure the spline interpolators.
180 self.splines = []
181 self.fixture_map = {}
182 for i, f in enumerate(fixtures):
183 log.debug("Found fixture %d: %s", i, f)
184 name = f.get('name')
185 base = f.get('dmx')
186 channels = f.get('channels')
187 interpolator = FixtureSpline(channels=channels, DMX_base=base)
188 self.splines.append(interpolator)
189 self.fixture_map[name] = interpolator
190
191 self.num_channels = len(self.splines)
192
193 return
194 #---------------------------------------------------------------
195 def set_targets(self, values):
196 """Set all spline interpolation targets."""
197 with self.array_lock:
198 for i, value in enumerate(values):
199 self.splines[i].set_target([value])
200
201 def set_values(self, values):
202 """Immediately set all spline interpolor values."""
203 with self.array_lock:
204 for i, value in enumerate(values):
205 self.splines[i].set_value([value])
206
207 #---------------------------------------------------------------
208 def close(self):
209 log.info("Closing serial ports.")
210 self.dmx.close_serial_port()
211
212 #---------------------------------------------------------------
213 def init_networking(self, args):
214 # Initialize the OSC message dispatch system.
215 self.dispatch = dispatcher.Dispatcher()
216 self.dispatch.map("/fixture", self.lights_fixture)
217 self.dispatch.map("/spline", self.lights_spline)
218 self.dispatch.map("/tempo", self.lights_tempo)
219
220 self.dispatch.set_default_handler(self.unknown_message)
221
222 # Start and run the server.
223 self.server = osc_server.OSCUDPServer((args.ip, lighting_UDP_port), self.dispatch)
224 self.server_thread = threading.Thread(target=self.server.serve_forever)
225 self.server_thread.daemon = True
226 self.server_thread.start()
227 log.info("started OSC server on port %s:%d", args.ip, lighting_UDP_port)
228
229 def unknown_message(self, msgaddr, *args):
230 """Default handler for unrecognized OSC messages."""
231 log.warning("Received unmapped OSC message %s: %s", msgaddr, args)
232
233 def lights_fixture(self, msgaddr, *args):
234 """Process /fixture messages received via OSC over UDP. The first value
235 is treated as a fixture index or name, followed by one or more DMX
236 values specifying the target level or color. The fixture starts a
237 smooth transition to the specified value.
238 """
239
240 try:
241 values = np.array(args[1:])
242 values = np.minimum(np.maximum(values, 0), 255)
243
244 # first try the fixture argument as an identifier
245 fixture = args[0]
246 interpolator = self.fixture_map.get(fixture)
247 if interpolator is None:
248 # else try the fixture argument as a numerical index
249 fixture = int(fixture)
250 interpolator = self.splines[fixture]
251
252 with self.array_lock:
253 interpolator.set_target(values)
254
255 self.fixture_msg_count += 1
256
257 except:
258 log.warning("OSC message failed for /fixture: %s", args)
259
260 def lights_spline(self, msgaddr, *args):
261 """Process /spline messages received via OSC over UDP. The first
262 message value specifies a fixture number, followed by a row-major array
263 representing a matrix of Bezier spline knots. The matrix has one column
264 per fixture channel and three rows per spline segment, and specifies DMX
265 output values. The implicit first knot is the current fixture state.
266 """
267 try:
268 # first try the fixture argument as an identifier
269 fixture = args[0]
270 interpolator = self.fixture_map.get(fixture)
271 if interpolator is None:
272 # else try the fixture argument as a numerical index
273 fixture = int(fixture)
274 interpolator = self.splines[fixture]
275
276 # calculate the inferred matrix properties, ignoring excess values
277 channels = interpolator.channels()
278
279 num_values = len(args[1:])
280 num_rows = num_values // channels
281 num_segments = num_rows // 3
282 num_used = 3*num_segments*channels
283
284 # convert the usable args into a knot matrix
285 knots = np.array(args[1:1+num_used]).reshape(3*num_segments, channels)
286
287 # and add to the fixture lighting trajectory
288 with self.array_lock:
289 interpolator.add_spline(knots)
290
291 self.spline_msg_count += 1
292
293 except:
294 log.warning("error processing OSC spline message: %s", args)
295
296 def lights_tempo(self, msgaddr, *args):
297 """Process /tempo messages received via OSC over UDP. The first message
298 value specifies a fixture, the second a spline execution rate in
299 segments per minute.
300 """
301 try:
302 # first try the fixture argument as an identifier
303 fixture = args[0]
304 interpolator = self.fixture_map.get(fixture)
305 if interpolator is None:
306 # else try the fixture argument as a numerical index
307 fixture = int(fixture)
308 interpolator = self.splines[fixture]
309
310 tempo = float(args[1])
311 interpolator.set_tempo(tempo)
312
313 self.tempo_msg_count += 1
314
315 except:
316 log.warning("error processing OSC tempo message: %s", args)
317
318 #---------------------------------------------------------------
319 # Event loop.
320 def run(self):
321 """Run the lighting server event loop to periodically update the fade
322 filters and retransmit the DMX universe to the hardware. OSC UDP
323 network messages are received by a background thread and applied to the
324 target and intensity arrays to be processed by this loop."""
325
326 start_t = time.monotonic_ns()
327 next_cycle_t = start_t + self.dt_ns
328 event_cycles = 0
329
330 while True:
331 # wait for the next cycle timepoint, keeping the long
332 # term rate stable even if the short term timing jitters
333 now_ns = time.monotonic_ns()
334 delay = max(next_cycle_t - now_ns, 0)
335 if (delay > 0):
336 time.sleep(delay * 1e-9)
337 next_cycle_t += self.dt_ns
338 now_ns = time.monotonic_ns()
339 now_seconds = (now_ns - start_t)*1e-9
340
341 # apply first-order smoothing to the lighting commands
342
343 with self.array_lock:
344 for spline in self.splines:
345 spline.update(self.dt_ns * 1e-9, self.dmx)
346
347 # and send to the hardware
348 self.dmx.send_universe()
349
350 # the following is not necessary given the output-only usage of the
351 # Enttec device, and also appears to be triggering a FTDI driver bug on
352 # recent versions of macOS:
353 # self.dmx.flush_serial_input()
354
355 # periodically log status
356 if (event_cycles % (25*60*5)) == 0:
357 with self.array_lock:
358 log.debug("event cycle %d: fixture msgs: %d, spline msgs: %d, tempo msgs: %d",
359 event_cycles,
360 self.fixture_msg_count,
361 self.spline_msg_count,
362 self.tempo_msg_count)
363 event_cycles += 1
364
365#================================================================
366# The following section is run when this is loaded as a script.
367if __name__ == "__main__":
368
369 # set up logging
370 open_log_file('logs/lighting-server.log')
371 log.info("Starting lighting-server.py")
372 np.set_printoptions(linewidth=np.inf)
373
374 # Initialize the command parser.
375 parser = argparse.ArgumentParser(description = cmd_desc)
376 parser.add_argument( '--ip', default=lighting_host_IP, help="Network interface address (default: %(default)s).")
377 parser.add_argument( '--dmx', default='/dev/ttyUSB0', help='DMX serial port device (default: %(default)s).')
378 add_logging_args(parser)
379
380 # Parse the command line, returning a Namespace.
381 args = parser.parse_args()
382
383 # Modify logging settings as per common arguments.
384 configure_logging(args)
385
386 # Create the event loop.
387 event_loop = EventLoop(args)
388
389 # Begin the performance. This may be safely interrupted by the user pressing Control-C.
390 try:
391 event_loop.run()
392
393 except KeyboardInterrupt:
394 log.info("User interrupted operation.")
395 print("User interrupt, shutting down.")
396 event_loop.close()
397
398 log.info("Exiting lighting-server.py")