#!/usr/bin/env python3
"""A simulated winch system with GUI."""
################################################################
# Written in 2018-2019 by Garth Zeglin <garthz@cmu.edu>
# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.
# You should have received a copy of the CC0 Public Domain Dedication along with this software.
# If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
################################################################
# standard Python libraries
from __future__ import print_function
import sys, logging
# for documentation on the PyQt5 API, see http://pyqt.sourceforge.net/Docs/PyQt5/index.html
from PyQt5 import QtCore, QtWidgets
# This uses several modules from the course library.
import kf.QtWinch
import kf.QtMPD218
import kf.QtConfig
import kf.QtLog
import kf.QtPanelScene
import kf.QtConcentricLoops
import kf.QtOrbits
import kf.QtWaves
import kf.path
import kf.midi
import kf.osc
import kf.sim
import kf.loops
import kf.orbits
import kf.waves
# set up logger for module
log = logging.getLogger('sim3')
################################################################
# Please keep these user instructions up to date with the ControlLogic class.
help_tab_text = """\
General Help
(N.B. may not be up to date with code.)
Pad bank A: direct control of eight winches
red-L-up red-R-up yel-L-up yel-R-up
red-L-dn red-R-dn yel-L-dn yel-R-dn
grn-L-up grn-R-up blu-L-up blu-R-up
grn-L-dn grn-R-dn blu-L-dn blu-R-dn
Pad bank B: fixed setpoints for eight winches
red-L-top red-R-top yel-L-top yel-R-top
red-L-bot red-R-bot yel-L-bot yel-R-bot
grn-L-top grn-R-top blu-L-top blu-R-top
grn-L-bot grn-R-bot blu-L-bot blu-R-bot
Pad bank C: set pulsing mode for eight winches
Knob 1: tempo (freq) of winch oscillation
Knob 2: damping of winch oscillation
Knob 3-6: stage lighting intensity level
"""
################################################################
[docs]class ControlLogic(kf.midi.MIDIProcessor):
"""Core performance logic for processing MIDI input into winch commands."""
def __init__(self):
super(ControlLogic,self).__init__()
self.winches = None
self.display = None
self.lights = None
self.loops = None
self.orbits = None
self.waves = None
self.midi_out= None
self.user_mode = None
self.channels = 8
self.all_axes = range(self.channels) # index list for updating all motors
self.frequency = 1.0
self.damping_ratio = 1.0
self.winch_dir = [0]*self.channels # array to keep track of currently moving winches for aftertouch control
# set up a metronome timer
self.tempo = 60 # metronome rate in beats per minute
self.metronome_timer = QtCore.QTimer()
self.metronome_timer.start(int(60000/self.tempo)) # units are milliseconds
self.metronome_timer.timeout.connect(self.metronome_tick)
# state variables for the metronome process
self.pulsing = [0]*self.channels # array of offsets for metronomic output pulsing
return
[docs] def connect_winches(self, winches):
"""Attach a winch output device to the performance logic, either physical or simulated."""
self.winches = winches
[docs] def connect_display(self, display):
"""Attach a console status output device to the performance logic."""
self.display = display
[docs] def connect_lights(self, lights):
"""Attach a stage lighting output device to the performance logic."""
self.lights = lights
[docs] def connect_MIDI_out(self, midi_out):
"""Attach a MIDI sender to the performance logic."""
self.midi_out = midi_out
[docs] def connect_loops(self, loops):
"""Attach a tape loop generator to the performance logic."""
self.loops = loops
self.loops.connect_cycle_listener(self._loop_cycle_complete)
[docs] def connect_orbits(self, orbits):
"""Attach a orbit dynamics generator to the performance logic."""
self.orbits = orbits
self.orbits.connect_boundary_collision_listener(self._boundary_collision)
[docs] def connect_waves(self, waves):
"""Attach a wave dynamcis generator to the performance logic."""
self.waves = waves
self.waves.connect_rising_edge_listener(self._rising_wave_edge)
[docs] def set_user_mode(self, name):
self.user_mode = name
#---- methods related to the metronome -------------------------------------
[docs] def set_metronome_tempo(self, tempo):
"""Adjust the metronome timer rate. N.B. the underlying Qt QTimer accepts
intervals in milliseconds, so this is fairly precise for beats but will
be approximate for small subdivisions. E.g. a 120 BPM timer at 500 ms
is precise, but the 32nd note subdivision at 62.5 ms would be rounded to
62 ms and run about 1% slow.
:param tempo: tempo in BPM
"""
self.tempo = tempo
self.metronome_timer.setInterval(int(60000/self.tempo)) # units are milliseconds
[docs] def metronome_tick(self):
"""Callback invoked at regular intervals governed by the metronome timer. In
this particular example, the metronome can trigger a regular series of
alternating forward and back movements to excite the path generator
oscillators.
"""
# check if any pulsing is active, i.e., any value is non-zero
if any(self.pulsing):
# send out the next pulse movement command
self.winches.increment_target(self.all_axes, self.pulsing)
# and reverse the directions for the next iteration
self.pulsing = [-d for d in self.pulsing]
#---- methods related to the loop generator -------------------------------------
def _loop_cycle_complete(self, channel, time):
log.info("Loop %d completed at %f", channel, time)
# F Ab C dB Eb F Ab
note = [41, 44, 48, 49, 51, 53, 56][channel]
if self.user_mode == 'Loops':
self.midi_out.note_on(1, note+24, 100)
#---- methods related to the orbit generator -------------------------------------
def _boundary_collision(self, channel, velocity):
# log.debug("Particle %d collided with %f", channel, velocity)
note = (60, 62, 64, 65, 67, 69, 71)[channel%7]
if self.user_mode == 'Orbits':
self.midi_out.note_on(1, note, min(127, int(200*velocity)))
def _rising_wave_edge(self, wave, particle):
if self.user_mode == 'Waves':
if wave == 0:
self.midi_out.note_on(1, 48+particle, 64)
#---- methods to process MIDI messages -------------------------------------
[docs] def note_on(self, channel, key, velocity):
"""Process a MIDI Note On event."""
log.debug("ControlLogic received note on: %d, %d", key, velocity)
row, col, bank = self.decode_mpd218_key(key)
# two pads per winch in front-back pairs
winch = col + 4*(row//2)
delta = 5 * velocity
if row == 0 or row == 2: delta = -delta
if col == 1 or col == 3: delta = -delta
if bank == 0: # bank A directly controls the winches
self.winches.increment_target(winch, delta)
self.winch_dir[winch] = 1 if delta > 0 else -1 if delta < 0 else 0
self.pulsing[winch] = 0 # reset pulsing on this winch
elif bank == 1: # bank B invokes fixed setpoints
self.pulsing[winch] = 0 # reset pulsing on this winch
if row == 0 or row == 2:
# lower setpoint
if col == 0 or col == 2:
self.winches.set_target(winch, -6000)
else:
self.winches.set_target(winch, 6000)
else:
# upper setpoint
if col == 0 or col == 2:
self.winches.set_target(winch, 0)
else:
self.winches.set_target(winch, 0)
else: # bank C pads invoke the metronome oscillation
self.pulsing[winch] = delta
log.debug("Pulsing array now %s", self.pulsing)
return
[docs] def note_off(self, channel, key, velocity):
"""Process a MIDI Note Off event."""
log.debug("ControlLogic received note off: %d, %d", key, velocity)
row, col, bank = self.decode_mpd218_key(key)
winch = col + 4*(row//2)
self.winch_dir[winch] = 0
self.winches.set_velocity(winch, 0)
[docs] def control_change(self, channel, cc, value):
"""Process a MIDI Control Change event."""
knob, bank = self.decode_mpd218_cc(cc)
log.debug("Control change %d on knob %d bank %d", cc, knob, bank)
if bank == 0:
if knob == 1: # Knob #1 on MPD218, use to control resonant frequency
tempo = 40 + value # beats per minute
self.frequency = tempo / 60.0 # beats per second
self.winches.set_freq_damping(self.all_axes, self.frequency, self.damping_ratio)
self.display.set_status("Tempo: %f, frequency: %f, damping ratio: %f" % (tempo, self.frequency, self.damping_ratio))
elif knob == 2: # Knob #2 on on MPD218, use to control damping ratio
self.damping_ratio = 0.05 + 0.01 * value
self.winches.set_freq_damping(self.all_axes, self.frequency, self.damping_ratio)
self.display.set_status("Frequency: %f, damping ratio: %f" % (self.frequency, self.damping_ratio))
else:
# for now, simple direct control of lights
self.lights.set_intensity(knob-3, value/127.0)
elif bank == 1:
if self.user_mode == 'Loops':
self.loops.set_speed(knob-1, 1 + value/2)
elif self.user_mode == 'Orbits':
if knob == 1:
force = 0.01*value
log.debug("Setting radial orbit force of %f", force)
self.orbits.set_applied_force(0.0, force)
elif knob == 2:
force = 0.01*value
log.debug("Setting tangential orbit force of %f", force)
self.orbits.set_applied_force(force, 0.0)
elif knob == 3:
value = 0.005*value
log.debug("Setting gravity of %f", value)
self.orbits.set_gravity(value)
elif knob == 4:
value = 0.005*value
log.debug("Setting damping of %f", value)
self.orbits.set_damping(value)
elif self.user_mode == 'Waves':
if knob == 1:
self.waves.set_all_damping(0.01*value)
elif knob == 2:
self.waves.set_all_period(60 / (1+value))
return
[docs] def channel_pressure(self, channel, pressure):
"""Process a MIDI Channel Pressure event."""
velocities = [direction * 10 * pressure for direction in self.winch_dir]
self.winches.set_velocity(self.all_axes, velocities)
log.debug("aftertouch: %d, velocities: %s", pressure, velocities)
################################################################
[docs]class SimWindow(QtWidgets.QMainWindow):
"""A custom main window which provides all GUI controls. This generally follows
a model-view-controller convention in which this window provides the views,
passing events to the application controller via callbacks.
"""
def __init__(self):
super(SimWindow,self).__init__()
# the graphical state
self.cartoons = list() # WinchCartoon objects
self.midi_processor = None
# create the GUI elements
self.scene_location = None
self._setupUi()
# finish initialization
self.show()
return
# ------------------------------------------------------------------------------------------------
def _setupUi(self):
# basic window setup
self.setWindowTitle("KF System Simulator 3")
self.statusbar = QtWidgets.QStatusBar(self)
self.setStatusBar(self.statusbar)
# set up tabbed page structure
self.tabs = QtWidgets.QTabWidget()
self.setCentralWidget(self.tabs)
self.tabs.currentChanged.connect(self._tab_changed)
# set up the main tab
self.mainTab = QtWidgets.QWidget(self)
self.mainLayout = QtWidgets.QVBoxLayout(self.mainTab)
self.tabs.addTab(self.mainTab, 'Main')
# set up complex cartoon showing a suspended winch system
self.scene = kf.QtPanelScene.QtPanelScene()
self.mainLayout.addWidget(self.scene)
self.scene_location = 'Main'
# generate a simulated MPD218 controller
self.MIDI_controller = kf.QtMPD218.QtMPD218()
self.mainLayout.addWidget(self.MIDI_controller)
# set up the scene tab
self.sceneTab = QtWidgets.QWidget(self)
self.sceneLayout = QtWidgets.QVBoxLayout(self.sceneTab)
self.tabs.addTab(self.sceneTab, 'Scene')
# set up the configuration tab
self.configForm = kf.QtConfig.QtConfigForm()
self.tabs.addTab(self.configForm, 'Config')
self.oscListenerConfig = kf.QtConfig.QtConfigOSCPort()
self.configForm.addField("OSC message listener address:port", self.oscListenerConfig)
self.midiCombo = kf.QtConfig.QtConfigComboBox()
self.configForm.addField("MIDI input", self.midiCombo)
self.midiOutputCombo = kf.QtConfig.QtConfigComboBox()
self.configForm.addField("MIDI output", self.midiOutputCombo)
# set up the logging tab
self.logDisplay = kf.QtLog.QtLog(level=logging.INFO)
self.tabs.addTab(self.logDisplay, 'Log')
# set up a tab with the winch cartoons
self.winchSet = kf.QtWinch.QtWinchSet(count=8)
self.tabs.addTab(self.winchSet, 'Winches')
self.cartoons = self.winchSet.winches()
# set up a tab with the loop generator
self.loops = kf.QtConcentricLoops.QtConcentricLoops()
self.tabs.addTab(self.loops, 'Loops')
# set up a tab with the orbit generator
self.orbits = kf.QtOrbits.QtOrbits()
self.tabs.addTab(self.orbits, 'Orbits')
# set up a tab with the waves generator
self.waves = kf.QtWaves.QtWaves1D()
self.tabs.addTab(self.waves, 'Waves')
# set up the help tab
self.help = QtWidgets.QLabel()
self.help.setText(help_tab_text)
self.tabs.addTab(self.help, 'Help')
# change the tab initially visible
self.tabs.setCurrentWidget(self.orbits)
# self.tabs.setCurrentWidget(self.waves)
return
# --- configure callbacks to connect GUI to application controller -----------------------------
[docs] def connect_midi_processor(self, processor):
"""Connect an object to receive synthetic MIDI events; the object is assumed to have MIDIProcessor methods."""
self.MIDI_controller.connect_midi_processor(processor)
self.midi_processor = processor
[docs] def connect_osc_listener(self, listener):
"""Connect an OSC network listener to the port configuration control."""
self.oscListenerConfig.callback = listener.set_OSC_port
[docs] def connect_midi_listener(self, listener):
"""Connect a MIDI input listener to the port configuration control."""
self.midiCombo.callback = listener.open_MIDI_input
self.midiCombo.set_items(listener.get_midi_port_names())
[docs] def connect_midi_sender(self, sender):
"""Connect a MIDI output sender to the port configuration control."""
self.midiOutputCombo.callback = sender.open_MIDI_output
self.midiOutputCombo.set_items(sender.get_midi_port_names())
# --- window and Qt event processing -------------------------------------------------------------
[docs] def set_status(self, string):
"""Update the status bar at the bottom of the display to show the provided string."""
self.statusbar.showMessage(string)
return
def _tab_changed(self, index):
new_tab_name = self.tabs.tabText(index)
log.debug("Tab changed to %d, %s", index, new_tab_name)
if index==0: # changing to main tab
if self.scene_location == 'Scene':
log.debug("Removing scene widget from scene layout.")
self.sceneLayout.removeWidget(self.scene)
self.mainLayout.insertWidget(0, self.scene)
self.scene_location = 'Main'
elif index==1: # changing to scene tab
if self.scene_location == 'Main':
log.debug("Removing scene widget from main layout.")
self.mainLayout.removeWidget(self.scene)
self.sceneLayout.insertWidget(0, self.scene)
self.scene_location = 'Scene'
if self.midi_processor is not None:
if new_tab_name not in ('Log', 'Config'):
self.midi_processor.set_user_mode(new_tab_name)
return
[docs] def closeEvent(self, event):
"""Qt callback received before windows closes."""
log.info("Received window close event.")
super(SimWindow,self).closeEvent(event)
return
# --------------------------------------------------------------------------------------------------
################################################################
[docs]class MainApp(object):
"""Main application controller object holding any non-GUI related state."""
def __init__(self):
# create the interface window
self.window = SimWindow()
# initialize the winch set simulator
self.sim = kf.sim.SimWinch(count=8)
# initialize the tape loop generator
self.loops = kf.loops.ConcentricLoops()
# initialize the particle orbit generator
self.orbits = kf.orbits.Orbits()
# initialize the wave system generator
self.waves = kf.waves.Waves1D()
# Initialize the performance logic.
self.control_logic = ControlLogic()
self.control_logic.connect_winches(self.sim)
self.control_logic.connect_loops(self.loops)
self.control_logic.connect_orbits(self.orbits)
self.control_logic.connect_waves(self.waves)
self.control_logic.connect_display(self.window)
self.control_logic.connect_lights(self.window.scene)
self.window.connect_midi_processor(self.control_logic)
# Initialize the MIDI input system.
self.midi_listener = kf.midi.QtMIDIListener()
self.midi_listener.connect_midi_processor(self.control_logic)
self.window.connect_midi_listener(self.midi_listener)
# Initialize the MIDI output system.
self.midi_sender = kf.midi.QtMIDISender()
self.window.connect_midi_sender(self.midi_sender)
self.control_logic.connect_MIDI_out(self.midi_sender)
# Initialize the OSC message dispatch system.
self.osc_listener = kf.osc.QtOSCListener()
self.osc_listener.map_handler("/midi", self._received_remote_midi)
self.osc_listener.map_handler("/winch/*", self._received_remote_winch)
self.osc_listener.open_receiver()
self.window.connect_osc_listener(self.osc_listener)
# start the animation timer
self.frame_timer = QtCore.QTimer()
self.frame_timer.start(40) # units are milliseconds
self.frame_timer.timeout.connect(self._timer_tick)
return
def _timer_tick(self):
# Method called at intervals by the animation timer to update the model and graphics.
self.sim.update_for_interval(0.040)
positions = self.sim.positions()
for pos, cartoon in zip(positions, self.window.cartoons):
cartoon.update_position(pos)
self.window.scene.update_positions(positions)
self.loops.update_for_interval(0.040)
self.window.loops.update_positions(self.loops.angles())
self.orbits.update_for_interval(0.040)
positions, attractors = self.orbits.get_particles_and_bodies()
self.window.orbits.update_attractors(attractors)
self.window.orbits.update_positions(positions)
self.waves.update_for_interval(0.040)
self.window.waves.update_positions(self.waves.get_positions())
# ---- process OSC network messages ---------------------------------------
# The midi_osc_bridge.py tool can tunnel MIDI messages over the network as
# OSC packets. This routes any bridged MIDI data into the same performance
# logic.
def _received_remote_midi(self, msgaddr, *args):
log.debug("remote midi: %s", " ".join([str(arg) for arg in args]))
self.control_logic.decode_message(args)
return
def _received_remote_winch(self, msgaddr, *args):
# This receives OSC messages prefixed with /winch intended for direct
# application to the winches, bypassing the control logic.
log.debug("remote winch: %s %s", msgaddr, " ".join([str(arg) for arg in args]))
if msgaddr=='/winch/delta':
flags = args[0]
channels = [{'x':0, 'y':1, 'z':2, 'a':3}[letter] for letter in flags]
self.sim.increment_target(channels, args[1:])
################################################################
def _main():
# initialize the Qt system itself
app = QtWidgets.QApplication(sys.argv)
# create the main application controller
main = MainApp()
# run the event loop until the user is done
log.info("Starting event loop.")
sys.exit(app.exec_())
################################################################
# Main script follows. This sequence is executed when the script is initiated from the command line.
if __name__ == "__main__":
_main()