Suitcase Theater

The suitcase theater setup includes a suitcase containing four stepper motors and two DMX-controlled light fixtures. The live performance is operated by a single Python script.

Related simulation: Suitcase Robot Model.

The source files can be browsed directly from suitcase_theater/ or may be downloaded as suitcase_theater.zip.

It requires numpy and pySerial installed in your Python 3 system (see Python 3 Installation).

run-show.py

  1#!/usr/bin/env python3
  2
  3cmd_desc = "Run a scripted performance using Arduino-controlled stepper motors and DMX-controlled lighting."
  4
  5import argparse
  6import time
  7import math
  8import logging
  9
 10# import the calendar policy
 11from HuntLibrary import current_time_properties
 12
 13# import the StepperWinch interface
 14import steppers
 15
 16# import the DMXUSBPro interface
 17import dmxusbpro
 18
 19# import the specific performance script
 20from demo_script import SuitcasePerformance
 21
 22# initialize logging for this module
 23log = logging.getLogger('show')
 24
 25#================================================================
 26class Show:
 27    def __init__(self, args):
 28
 29        # configure event timing
 30        self.dt_ns = 50*1000*1000  # 20 Hz in nanoseconds per cycle
 31        # self.dt_ns *= 10 # tmp slow the rate
 32
 33        # Open the DMX controller serial port
 34        log.info("Opening DMX serial port.")
 35        self.dmx = dmxusbpro.DMXUSBPro(port=args.dmx, verbose=args.verbose, debug=args.debug)
 36        self.dmx.open_serial_port()
 37
 38        # Current light state used for smoothing changes.
 39        self.spot_intensity = [0.0, 0.0]
 40
 41        # Open the Arduino serial port
 42        log.info("Opening Arduino serial port.")
 43        self.steppers = steppers.StepperWinchClient(port=args.arduino, verbose=args.verbose, debug=args.debug)
 44
 45        log.info("Waiting for Arduino wakeup.")
 46        self.steppers.wait_for_wakeup()
 47        self.steppers.set_freq_damping(0.5, 1.0)
 48
 49        # Create the performance controller(s)
 50        self.controller = SuitcasePerformance()
 51
 52        # Test the current time
 53        policy = current_time_properties()
 54        log.info("Current time policy: %s", policy)
 55
 56        # configure party mode which ignores calendar
 57        self.party_mode = args.party
 58        if self.party_mode:
 59            log.info("Party mode enabled.")
 60        return
 61
 62    def close(self):
 63        # Issue a command to turn off the drivers, then shut down the connections.
 64        log.info("Closing serial ports.")
 65        self.steppers.motor_enable(False)
 66        self.steppers.close()
 67        self.dmx.close_serial_port()
 68
 69    #---------------------------------------------------------------
 70    # Action primitives for current setup.
 71    def set_spotlight(self, name, level):
 72
 73        # This is the right light as viewed from outside, i.e. stage left.
 74        if name == 'right':
 75            # convert a unit value (0.0,1.0) to 8-bit binary (0, 255)
 76            value = min(max(int(level*255), 0), 255)
 77            self.dmx.universe[0] = value
 78
 79        # This is the left light as viewed from outside, i.e. stage right.
 80        elif name == 'left':
 81            value = min(max(int(level*255), 0), 255)
 82            self.dmx.universe[1] = value
 83
 84        # This sets the available lights from an array, list, tuple, or sequence
 85        elif name == 'all':
 86            values = [min(max(int(val*255), 0), 255) for val in level]
 87            self.dmx.universe[0:2] = values
 88
 89        # Update the hardware
 90        self.dmx.send_universe()
 91
 92    def send_angles(self, angles):
 93        steps = [int(angle * (800/(2*math.pi))) for angle in angles]
 94        self.steppers.send_move(steps)
 95        return
 96
 97    #---------------------------------------------------------------
 98    # Event loop for operating the performance
 99    def run(self):
100        start_t = time.monotonic_ns()
101        next_cycle_t = start_t + self.dt_ns
102
103        while True:
104            # wait for the next cycle timepoitn, keeping the long
105            # term rate stable even if the short term timing jitters
106            now_ns = time.monotonic_ns()
107            delay = max(next_cycle_t - now_ns, 0)
108            if (delay > 0):
109                time.sleep(delay * 1e-9)
110            next_cycle_t += self.dt_ns
111            now_ns = time.monotonic_ns()
112            now_seconds = (now_ns - start_t)*1e-9
113            log.debug("Time is %f" %(now_seconds))
114
115            # update the performance
116            self.controller.poll(now_seconds)
117
118            # keep the stepper input buffer empty
119            self.steppers.poll_status()
120
121            # check the time policy
122            policy = current_time_properties()
123            if self.party_mode or policy['is_show_time']:
124                # output motion commands to the stepper hardware
125                self.send_angles(self.controller.target)
126
127                # apply first-order smoothing to the lighting commands
128                self.spot_intensity[0] += 0.2 * (self.controller.lights['left'] - self.spot_intensity[0])
129                self.spot_intensity[1] += 0.2 * (self.controller.lights['right'] - self.spot_intensity[1])
130                self.set_spotlight('all', self.spot_intensity)
131            else:
132                # keep sending commands for a quiescent state
133                self.send_angles([0.0, 0.0, 0.0, 0.0])
134                self.spot_intensity[0] *= 0.8
135                self.spot_intensity[1] *= 0.8
136                self.set_spotlight('all', self.spot_intensity)
137
138#================================================================
139# The following section is run when this is loaded as a script.
140if __name__ == "__main__":
141
142    # set up logging
143    log_format= '%(asctime)s:%(levelname)s:%(name)s: %(message)s'
144    log_datefmt="%Y-%m-%d %H:%M:%S"
145    log_stream = open('show.log', 'a')
146    logging.basicConfig(stream=log_stream, level=logging.INFO, format=log_format, datefmt=log_datefmt)
147    log.info("Starting run-show.py")
148
149    # Initialize the command parser.
150    parser = argparse.ArgumentParser(description = cmd_desc)
151    parser.add_argument( '--dmx', default='/dev/ttyUSB0', help='DMX serial port device (default: %(default)s).')
152    parser.add_argument( '--arduino', default='/dev/ttyACM0', help='Arduino serial port device (default is %(default)s.)')
153    parser.add_argument( '--party', action='store_true', help='Enable party mode which overrides calendar policy.')
154    parser.add_argument( '--debug', action='store_true', help='Enable debugging output to log file.' )
155    parser.add_argument( '--verbose', action='store_true', help='Enable even more detailed logging output.' )
156
157    # Parse the command line, returning a Namespace.
158    args = parser.parse_args()
159
160    # Enable debug messages in logs on request.
161    if args.debug:
162        logging.getLogger().setLevel(logging.DEBUG)
163
164    # Create the show controller.
165    show = Show(args)
166
167    # Begin the performance.  This may be safely interrupted by the user pressing Control-C.
168    try:
169        show.run()
170
171    except KeyboardInterrupt:
172        log.info("User interrupted performance.")
173        print("User interrupted performance, shutting down.")
174        show.close()

dmxusbpro.py

dmxusbpro.py

Class to manage a connection to a serial-connected Enttec DMXUSB Pro interface. Requires pySerial and numpy.

class suitcase_theater.dmxusbpro.DMXUSBPro(port=None, verbose=False, debug=False, universe_size=25, **kwargs)[source]

Class to manage a connection to a serial-connected Enttec DMXUSB Pro interface. This only supports output.

Parameters:
  • port – the name of the serial port device

  • verbose – flag to increase console output

  • debug – flag to print raw inputs on sconsole

  • kwargs – collect any unused keyword arguments

close_serial_port()[source]

Shut down the serial connection, after which this object may no longer be used.

flush_serial_input()[source]

Clear the input buffer.

is_connected()[source]

Return true if the serial port device is open.

open_serial_port()[source]

Open the serial connection to the controller.

send_universe()[source]

Issue a DMX universe update.

set_serial_port_name(name)[source]

Set the name of the serial port device.

steppers.py

steppers.py : sample code in Python to communicate with an Arduino running StepperWinch

No copyright, 2021-2023, Garth Zeglin. This file is explicitly placed in the public domain.

class suitcase_theater.steppers.StepperWinchClient(port=None, verbose=False, debug=False, **kwargs)[source]

Class to manage a connection to a serial-connected Arduino running the StepperWinch script.

Parameters:
  • port – the name of the serial port device

  • verbose – flag to increase console output

  • debug – flag to print raw inputs on sconsole

  • kwargs – collect any unused keyword arguments

close()[source]

Shut down the serial connection to the Arduino, after which this object may no longer be used.

motor_enable(value=True)[source]

Issue a command to enable or disable the stepper motor drivers.

move_to(position)[source]

Issue a command to move to a [x, y, z, a] absolute position (specified in microsteps) and wait until completion.

Parameters:

position – a list or tuple with at least four elements

send_move(position)[source]

Issue a command to move to a [x, y, z, a] absolute position (specified in microsteps) and return immediately.

Parameters:

position – a list or tuple with at least four elements

set_freq_damping(freq, damping)[source]

Issue a command to set the second-order model gains.

wait_for_wakeup()[source]

Issue a status query and wait until an ‘awake’ status has been received.

HuntLibrary.py

HuntLibrary.py

Schedule and calendar calculations for determining state of the HL A11 performance space and environs.

suitcase_theater.HuntLibrary.current_time_properties()[source]

Return a dictionary with the current time and a number of properties useful for time-based policy decisions.

suitcase_theater.HuntLibrary.datetime_properties(now)[source]

Given a datetime object, return a dictionary with the time and a number of properties useful for time-based policy decisions.

demo_script.py

 1# script.py
 2# Sample abstract performance script for operating the suitcase and lighting hardware on either
 3# the Webots simulation or the physical hardware.
 4# No copyright, 2023, Garth Zeglin.  This file is explicitly placed in the public domain.
 5
 6print("loading script.py...")
 7
 8# Standard Python imports.
 9import math, random
10
11# Define the abstract performance interface.
12from performance import Performance
13
14# Define an abstract performance controller which can run either the Webots simulation or a physical machine.
15class SuitcasePerformance(Performance):
16    def __init__(self):
17
18        # Initialize the superclass which implements the abstract robot interface.
19        super().__init__()
20
21        # Initialize the controller state machine
22        self.last_t = 0.0
23        self.event_timer = 0.0
24        self.state_index = 'rest'
25
26    # Entry point for periodic updates.
27    def poll(self, t):
28        dt = t - self.last_t
29        self.last_t = t
30
31        # Change the target velocity in a cycle.
32        self.event_timer -= dt
33        if self.event_timer < 0:
34            if self.state_index == 'rest':
35                self.set_spotlight('right', 1.0)
36                self.set_spotlight('left', 0.0)
37                self.state_index = 'moving'
38                self.event_timer += 2.5
39                for j in range(self.num_motors):
40                    pos = math.pi * (1 - 2*random.random())
41                    self.set_motor_target(j, pos)
42
43            else:
44                self.state_index = 'rest'
45                self.set_spotlight('right', 0.0)
46                self.set_spotlight('left', 1.0)
47                self.event_timer += 1.5
48                for j in range(self.num_motors):
49                    self.set_motor_target(j, 0)
50
51            print(f"Switched to state {self.state_index}")