Source code for script.script1
"""Demonstration script class for show1.py showing a self-contained process
for sequencing events over time. The inputs and outputs are deliberately
constrained to message queues to preclude synchronization problems and maintain
compatibility with network communication.
"""
################################################################
# Written in 2019 by Garth Zeglin <garthz@cmu.edu>
# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.
# You should have received a copy of the CC0 Public Domain Dedication along with this software.
# If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
################################################################
import time, queue
import rcp.script
################################################################
[docs]class Script(rcp.script.Script):
def __init__(self):
super().__init__()
return
[docs] def script_task(self):
"""Entry point for the script to run on a background thread."""
self.write("Script thread waking up.")
# top-level event loop to wait for a play or reset command
while True:
try:
command = self.input.get()
if command[0] == 'console':
self.write("Script thread received user command: %s" % command[1])
if command[1] == 'reset':
self.send_reset_cue()
elif command[1] == 'play':
self.sequence()
elif command[0] == 'script':
self.write("Script thread received network command: %s" % command[1])
if command[1] == 'start':
self.sequence()
elif command[0] == 'status':
self.write("Script thread received status: %s" % repr(command))
except rcp.script.ScriptStopException:
self.write("Script stopped and idle.")
except rcp.script.ScriptTimeoutException:
self.write("Warning: script step timed out. Script idle.")
[docs] def send_show(self, *args):
self.output.put(('show',) + args)
self.write('Issuing show message: ' + repr(args))
return
[docs] def send_cue(self, *args):
self.output.put(('cue',) + args)
self.write('Issuing cue: ' + repr(args))
return
[docs] def send_pose(self, name):
self.send_cue('pose',name)
[docs] def send_reset_cue(self):
self.write("Sending reset cue.")
self.send_cue('gains', 0.5, 1.0)
self.send_cue('pose', 'reset')
self.send_cue('random', False)
self.send_cue('tempo', 60.0)
self.send_cue('magnitude', 1.0)
return
[docs] def sequence(self):
"""Demonstration sequence. This could be decomposed further into subroutines."""
self.write("Script starting.")
self.send_cue('gains', 0.5, 1.0)
self.send_pose('reset')
self.sleep(1.0)
self.send_pose('lead1')
self.wait_until_stopped()
self.send_pose('lead2')
self.wait_until_stopped()
self.send_pose('lead3')
self.wait_until_stopped()
self.send_cue('random', True)
self.sleep(5.0)
self.send_cue('magnitude', 5.0)
self.sleep(5.0)
self.send_cue('tempo', 120)
self.sleep(5.0)
self.send_cue('random', False)
self.wait_until_stopped()
self.send_pose('reset')
self.sleep(5.0)
self.send_show('done')
self.write("Script done.")
return
################################################################