Source code for rcp.script
"""Objects related to performance scripting.
"""
################################################################
# Written in 2019 by Garth Zeglin <garthz@cmu.edu>
# To the extent possible under law, the author has dedicated all copyright
# and related and neighboring rights to this software to the public domain
# worldwide. This software is distributed without any warranty.
# You should have received a copy of the CC0 Public Domain Dedication along with this software.
# If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
################################################################
# standard Python libraries
import math, logging, functools, time, queue, threading
# set up logger for module.
log = logging.getLogger('script')
# N.B. the normal logger doesn't yet work from background threads since the
# default formatter writes to a Qt object only available on the main thread.
# filter out most logging; the default is NOTSET which passes along everything
log.setLevel(logging.INFO)
################################################################
[docs]
class ScriptTimeoutException(Exception):
pass
[docs]
class ScriptStopException(Exception):
pass
################################################################
[docs]
class Script(object):
"""A script is a procedure run as a separate thread which is permitted to block.
This supports a simpler notation for linear action sequences as normal
procedures instead of callback state machines. The script thread
communicates with the main thread solely using message queues to avoid
synchronization problems.
:ivars input: unified input queue; each item is a tuple in which the first keyword identifies the message type
:ivars output: unified output queue; each item is a tuple in which the first keyword identifies the message type
"""
def __init__(self):
self.input = queue.Queue()
self.output = queue.Queue()
return
[docs]
def start(self):
"""Start the script process in the background so will run asynchronously. This
function returns immediately."""
self.thread = threading.Thread(target=self.script_task)
self.thread.daemon = True
self.thread.start()
return
[docs]
def write(self, string):
"""Internal method to send a console message to the main thread."""
self.output.put(('console', string))
[docs]
def script_task(self):
"""Entry point for the script to run on a background thread. The default
implementation does nothing, this should be overridden in child
classes.
"""
pass
# ===============================================================
[docs]
def event_wait(self, predicate, timeout):
"""Consume input messages until one is received which satisfies the predicate or
the timeout is reached. Returns the non-False value of the predicate or
the string 'timeout'.
"""
start = time.time()
remaining = timeout
while remaining > 0:
try:
command = self.input.get(block=True, timeout=remaining)
# apply the predicate and return any non-False value
value = predicate(command)
if value is not False:
return value
self.write("event_wait ignoring message " + repr(command))
remaining = start + timeout - time.time()
except queue.Empty:
# if the queue 'get' timed out, fall out of the loop
remaining = 0
return 'timeout'
# ===============================================================
def _user_stop_predicate(self, command):
if command[0] == 'console' and command[1] == 'stop':
return 'user_stop'
else:
return False
[docs]
def sleep(self, duration):
"""Sleep for a given duration unless the user issues a stop command. Returns
nothing or raises a ScriptStopException."""
value = self.event_wait(self._user_stop_predicate, duration)
if value == 'user_stop':
raise ScriptStopException
else:
return
def _motion_stop_predicate(self, command):
if command[0] == 'status' and command[1] == 'stopped':
return 'motion_stop'
else:
return False
[docs]
def wait_until_stopped(self, timeout = 20.0):
"""Sleep until the motion generator reports a stopped status or the timeout is reached."""
predicate = lambda command: self._user_stop_predicate(command) or self._motion_stop_predicate(command)
value = self.event_wait(predicate, timeout)
if value == 'user_stop':
raise ScriptStopException
elif value == 'motion_stop':
return
else:
raise ScriptTimeoutException
################################################################