Source code for demo1

#!/usr/bin/env python3
"""A show control system with GUI."""
################################################################
# Written in 2018-2019 by Garth Zeglin <garthz@cmu.edu>

# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.

# You should have received a copy of the CC0 Public Domain Dedication along with this software.
# If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.

################################################################
# standard Python libraries
from __future__ import print_function
import sys, logging, signal

# for documentation on the PyQt5 API, see http://pyqt.sourceforge.net/Docs/PyQt5/index.html
from PyQt5 import QtCore, QtWidgets

# This uses several modules from the course library.
import kf.QtWinch
import kf.QtMPD218
import kf.QtDMX
import kf.QtConfig
import kf.QtLog
import kf.QtPlotter

import kf.path
import kf.midi
import kf.osc
import kf.sim
import kf.winch
import kf.dmx

# set up logger for module
log = logging.getLogger('demo1')

################################################################
[docs]class ControlLogic(kf.midi.MIDIProcessor): """Core performance logic for processing MIDI input into winch commands.""" def __init__(self): super(ControlLogic,self).__init__() self.winches = None self.dmx = None self.osc_sender = None self.display = None self.all_axes = range(4) # index list for updating all motors self.frequency = 1.0 self.damping_ratio = 1.0 self.winch_dir = [0,0,0,0] # array to keep track of currently moving winches for aftertouch control # set up a metronome timer self.tempo = 60 # metronome rate in beats per minute self.metronome_timer = QtCore.QTimer() self.metronome_timer.start(int(60000/self.tempo)) # units are milliseconds self.metronome_timer.timeout.connect(self.metronome_tick) self.metronome_ticks = 0 # state variables for the metronome process self.pulsing = [0,0,0,0] # array of offsets for metronomic output pulsing return
[docs] def connect_winches(self, winches): """Attach a winch output device to the performance logic, either physical or simulated.""" self.winches = winches
[docs] def connect_dmx(self, dmx): """Attach a dmx output device to the performance logic.""" self.dmx = dmx
[docs] def connect_osc_sender(self, sender): """Attach a OSC sender port to the performance logic.""" self.osc_sender = sender
[docs] def connect_display(self, display): """Attach a console status output device to the performance logic.""" self.display = display
#---- methods related to the metronome -------------------------------------
[docs] def set_metronome_tempo(self, tempo): """Adjust the metronome timer rate. N.B. the underlying Qt QTimer accepts intervals in milliseconds, so this is fairly precise for beats but will be approximate for small subdivisions. E.g. a 120 BPM timer at 500 ms is precise, but the 32nd note subdivision at 62.5 ms would be rounded to 62 ms and run about 1% slow. :param tempo: tempo in BPM """ self.tempo = tempo self.metronome_timer.setInterval(int(60000/self.tempo)) # units are milliseconds
[docs] def metronome_tick(self): """Callback invoked at regular intervals governed by the metronome timer. In this particular example, the metronome can trigger a regular series of alternating forward and back movements to excite the path generator oscillators. """ # check if any pulsing is active, i.e., any value is non-zero if any(self.pulsing): # send out the next pulse movement command self.winches.increment_target(self.all_axes, self.pulsing) # and reverse the directions for the next iteration self.pulsing = [-d for d in self.pulsing] if self.osc_sender is not None: self.osc_sender.send("/demo1/tick", self.metronome_ticks) self.metronome_ticks += 1
#---- methods to process MIDI messages -------------------------------------
[docs] def note_on(self, channel, key, velocity): """Process a MIDI Note On event.""" log.debug("ControlLogic received note on: %d, %d", key, velocity) row, col, bank = self.decode_mpd218_key(key) # Each column maps to a specific winch. # Rows 0 and 1 move forward, rows 2 and 3 move backward. # The middle rows make small motions, the outer rows make larger motions. delta = velocity // 2 if row <= 1 else velocity // 2 if row == 0 or row == 3: delta = delta * 2 if bank == 0 or bank == 1: # bank A or B directly control the winches self.winches.increment_target(col, delta) self.winch_dir[col] = 1 if delta > 0 else -1 if delta < 0 else 0 self.pulsing[col] = 0 # reset pulsing on this winch else: # bank C pads instead invoke the metronome oscillation self.pulsing[col] = delta//8 log.debug("Pulsing array now %s", self.pulsing)
[docs] def note_off(self, channel, key, velocity): """Process a MIDI Note Off event.""" log.debug("ControlLogic received note off: %d, %d", key, velocity) row, col, bank = self.decode_mpd218_key(key) self.winch_dir[col] = 0 self.winches.set_velocity(col, 0)
[docs] def control_change(self, channel, cc, value): """Process a MIDI Control Change event.""" knob, bank = self.decode_mpd218_cc(cc) log.debug("Control change %d on knob %d bank %d", cc, knob, bank) if bank==0 or bank==1: self.mpd218_knob_bank_a(knob, value) else: self.mpd218_knob_bank_c(knob, value) return
[docs] def channel_pressure(self, channel, pressure): """Process a MIDI Channel Pressure event.""" velocities = [direction * 10 * pressure for direction in self.winch_dir] self.winches.set_velocity(self.all_axes, velocities) log.debug("aftertouch: %d, velocities: %s", pressure, velocities)
# ---- methods for decoded MPD218 drum pad events ----------- # This break out the responses to different banks of controls simply for legibility. # Each bank is selected with the 'CTRL BANK' and 'PAD BANK' keys.
[docs] def mpd218_knob_bank_a(self, knob, value): if knob == 1: # Knob #1 on MPD218, use to control resonant frequency self.frequency = 0.05 + 0.1 * value elif knob == 2: # Knob #2 on on MPD218, use to control damping ratio self.damping_ratio = 0.05 + 0.01 * value self.winches.set_freq_damping(self.all_axes, self.frequency, self.damping_ratio) self.display.set_status("Frequency: %f, damping ratio: %f" % (self.frequency, self.damping_ratio))
[docs] def mpd218_knob_bank_c(self, knob, value): # Send a color update to the hardware. self.dmx.set_channel(knob-1, 2*value) # Send a color update to the slider display. self.display.DMX_controller.set_channel(knob-1, 2*value) # Send a color update to the color field display. self.update_DMX_color_display(knob-1, 2*value)
#---- methods related to DMX -------------------------------------
[docs] def dmx_slider_change(self, channel, value): # Send a color update to the hardware. self.dmx.set_channel(channel, value) # Send a color update to the color field display. self.update_DMX_color_display(channel, value) return
[docs] def dmx_remote_update(self, fixture, color): # Send a color update to the hardware. self.dmx.set_channels(4*fixture, color) # Send a color update to the slider display. self.display.DMX_controller.set_channels(3*fixture, color) # Send a color update to the color field display. self.display.DMX_colors.set_color(fixture, color)
[docs] def update_DMX_color_display(self, channel, value): """Map a DMX channel update to a specific simulated fixture based on the current hardware.""" fixture = channel // 4 color = channel % 4 if fixture < 3 and color < 3: self.display.DMX_colors.set_channel(fixture, color, value)
################################################################
[docs]class SimWindow(QtWidgets.QMainWindow): """A custom main window which provides all GUI controls. This generally follows a model-view-controller convention in which this window provides the views, passing events to the application controller via callbacks. """ def __init__(self): super(SimWindow,self).__init__() # the graphical state self.cartoons = list() # WinchCartoon objects # other possible callbacks self.close_handler = None # create the GUI elements self._setupUi() # finish initialization self.show() return # ------------------------------------------------------------------------------------------------ def _setupUi(self): # basic window setup self.setWindowTitle("KF System Controller: Demo 1") self.statusbar = QtWidgets.QStatusBar(self) self.setStatusBar(self.statusbar) # set up tabbed page structure self.tabs = QtWidgets.QTabWidget() self.setCentralWidget(self.tabs) self.tabs.currentChanged.connect(self._tab_changed) # set up the main tab self.mainTab = QtWidgets.QWidget(self) self.mainLayout = QtWidgets.QVBoxLayout(self.mainTab) self.tabs.addTab(self.mainTab, 'Main') # generate a horizontal array of winch cartoons self.winchSet = kf.QtWinch.QtWinchSet() self.mainLayout.addWidget(self.winchSet) self.cartoons = self.winchSet.winches() # generate a simulated MPD218 controller self.MIDI_controller = kf.QtMPD218.QtMPD218() self.mainLayout.addWidget(self.MIDI_controller) # generate a DMX controller tab self.dmxTab = QtWidgets.QWidget(self) self.dmxLayout = QtWidgets.QVBoxLayout(self.dmxTab) self.DMX_colors = kf.QtDMX.QtDMXColors(fixtures=3) self.dmxLayout.addWidget(self.DMX_colors) self.DMX_controller = kf.QtDMX.QtDMXControls(channels=12) self.dmxLayout.addWidget(self.DMX_controller) self.tabs.addTab(self.dmxTab, 'DMX') # set up the configuration tab self.configForm = kf.QtConfig.QtConfigForm() self.tabs.addTab(self.configForm, 'Config') self.oscListenerConfig = kf.QtConfig.QtConfigOSCPort() self.configForm.addField("OSC message listener address:port", self.oscListenerConfig) self.midiCombo = kf.QtConfig.QtConfigComboBox() self.configForm.addField("MIDI input", self.midiCombo) self.winchSelect = kf.QtConfig.QtConfigComboBox() self.configForm.addField("Winch output serial port", self.winchSelect) self.dmxSelect = kf.QtConfig.QtConfigComboBox() self.configForm.addField("DMX output serial port", self.dmxSelect) self.oscSenderConfig = kf.QtConfig.QtConfigOSCPort(portnum=3762) self.configForm.addField("OSC destination adddress:port", self.oscSenderConfig) # set up the logging tab self.logDisplay = kf.QtLog.QtLog(level=logging.INFO) self.tabs.addTab(self.logDisplay, 'Log') # set up a more complex cartoon showing a suspended plotter system self.plotter = kf.QtPlotter.QtPlotterCartoon() self.tabs.addTab(self.plotter, 'Plotter') return # --- configure callbacks to connect GUI to application controller -----------------------------
[docs] def connect_close_handler(self, handler): """Connect a callback to be invoked when the window is about to close.""" self.close_handler = handler
[docs] def connect_control_logic(self, logic): """Connect an object to receive synthetic MIDI events; the object is assumed to have MIDIProcessor methods.""" self.MIDI_controller.connect_midi_processor(logic) self.DMX_controller.connect_callback(logic.dmx_slider_change)
[docs] def connect_osc_listener(self, listener): """Connect an OSC network listener to the port configuration control.""" self.oscListenerConfig.callback = listener.set_OSC_port
[docs] def connect_osc_sender(self, sender): """Connect an OSC network sender to the port configuration control.""" self.oscSenderConfig.callback = sender.set_OSC_port
[docs] def connect_midi_listener(self, listener): """Connect a MIDI input listener to the port configuration control.""" self.midiCombo.callback = listener.open_MIDI_input self.midiCombo.set_items(listener.get_midi_port_names())
[docs] def connect_winch(self, winch): """Connect a serial winch output to the port configuration control.""" self.winchSelect.callback = winch.set_and_open_port self.winchSelect.set_items(winch.available_ports()) return
[docs] def connect_dmx(self, dmx): """Connect a serial dmx output to the port configuration control.""" self.dmxSelect.callback = dmx.set_and_open_port self.dmxSelect.set_items(dmx.available_ports()) return
# --- window and Qt event processing -------------------------------------------------------------
[docs] def set_status(self, string): """Update the status bar at the bottom of the display to show the provided string.""" self.statusbar.showMessage(string) return
def _tab_changed(self, index): log.debug("Tab changed to %d", index) return
[docs] def closeEvent(self, event): """Qt callback received before windows closes.""" log.info("Received window close event.") if self.close_handler is not None: self.close_handler() super(SimWindow,self).closeEvent(event) return
# -------------------------------------------------------------------------------------------------- ################################################################
[docs]class OutputPatcher(object): """This object routes winch output commands to one or more streams. E.g. it can duplicate outputs to both a serial port winch and a simulation. """ def __init__(self): self.outputs = list()
[docs] def add_output(self, output): self.outputs.append(output)
#------------------------------------------------------------------------------ # The command API follows which mimics the interface to the actual winches.
[docs] def set_target(self, axis, position): for output in self.outputs: output.set_target(axis, position)
[docs] def increment_target(self, axis, offset): for output in self.outputs: output.increment_target(axis, offset)
[docs] def set_velocity(self, axis, velocity): for output in self.outputs: output.set_velocity(axis, velocity)
[docs] def set_freq_damping(self, axis, freq, ratio): for output in self.outputs: output.set_freq_damping(axis, freq, ratio)
################################################################
[docs]class MainApp(object): """Main application controller object holding any non-GUI related state.""" def __init__(self): # create the interface window self.window = SimWindow() self.window.connect_close_handler(self._close_handler) # initialize the winch set simulator self.sim = kf.sim.SimWinch() # Initialize the performance logic. self.control_logic = ControlLogic() self.control_logic.connect_display(self.window) self.window.connect_control_logic(self.control_logic) # Initialize the MIDI input system. self.midi_listener = kf.midi.QtMIDIListener() self.midi_listener.connect_midi_processor(self.control_logic) self.window.connect_midi_listener(self.midi_listener) # Initialize the OSC message listener and dispatch system. self.osc_listener = kf.osc.QtOSCListener() self.osc_listener.map_handler("/midi", self._received_remote_midi) self.osc_listener.map_handler("/winch/*", self._received_remote_winch) self.osc_listener.map_handler("/dmx/*", self._received_remote_dmx) self.osc_listener.open_receiver() self.window.connect_osc_listener(self.osc_listener) # Initialize the OSC message sender. self.osc_sender = kf.osc.QtOSCSender() self.osc_sender.open_sender() self.window.connect_osc_sender(self.osc_sender) self.control_logic.connect_osc_sender(self.osc_sender) # Initialize the hardware winch system. self.winch = kf.winch.QtSerialWinch() self.window.connect_winch(self.winch) # Initialize the DMX lighting system. self.dmx = kf.dmx.QtDMXUSBPro() self.window.connect_dmx(self.dmx) self.control_logic.connect_dmx(self.dmx) # Create a virtual output to duplicate commands to simulation and winch. self.output = OutputPatcher() self.output.add_output(self.winch) self.output.add_output(self.sim) self.control_logic.connect_winches(self.output) # start the animation timer self.frame_timer = QtCore.QTimer() self.frame_timer.start(40) # units are milliseconds self.frame_timer.timeout.connect(self._timer_tick) # Attach a handler to the keyboard interrupt (control-C). signal.signal(signal.SIGINT, self._sigint_handler) return # ---- application event handlers ----------------------------------------------- def _timer_tick(self): # Method called at intervals by the animation timer to update the model and graphics. self.sim.update_for_interval(0.040) positions = self.sim.positions() for pos, cartoon in zip(positions, self.window.cartoons): cartoon.update_position(pos) self.window.plotter.update_positions(positions[0:2]) def _sigint_handler(self, signal, frame): print("Keyboard interrupt caught, running _close_handler()...") self._close_handler() sys.exit(0) def _close_handler(self): log.info("_close_handler closing any winches.") # This will help to avoid stale lock files. self.winch.close() self.dmx.close() # ---- process OSC network messages --------------------------------------- # The midi_osc_bridge.py tool can tunnel MIDI messages over the network as # OSC packets. This routes any bridged MIDI data into the same performance # logic. def _received_remote_midi(self, msgaddr, *args): log.debug("remote midi: %s", " ".join([str(arg) for arg in args])) self.control_logic.decode_message(args) return def _received_remote_winch(self, msgaddr, *args): # This receives OSC messages prefixed with /winch intended for direct # application to the winches, bypassing the control logic. log.debug("remote winch: %s %s", msgaddr, " ".join([str(arg) for arg in args])) if msgaddr=='/winch/delta' and len(args) >= 2: flags = args[0] try: channels = [{'x':0, 'y':1, 'z':2, 'a':3}[letter] for letter in flags] except KeyError: log.debug("Remote winch command has invalid channel flag: %s", flags) return if len(channels) != len(args)-1: log.warning("Remote winch command has mismatched flags and arguments: %s, %s" % (flags, args[1:])) else: self.output.increment_target(channels, args[1:]) def _received_remote_dmx(self, msgaddr, *args): # This receives OSC messages prefixed with /dmx intended for direct # application to the DMX output, bypassing the control logic. log.debug("remote DMX: %s %s", msgaddr, " ".join([str(arg) for arg in args])) if msgaddr=='/dmx/fixture' and len(args) == 4: fixture = args[0] self.control_logic.dmx_remote_update(fixture, args[1:])
################################################################ def _main(): # initialize the Qt system itself app = QtWidgets.QApplication(sys.argv) # create the main application controller main = MainApp() # run the event loop until the user is done log.info("Starting event loop.") sys.exit(app.exec_()) ################################################################ # Main script follows. This sequence is executed when the script is initiated from the command line. if __name__ == "__main__": _main()