Source code for ase.events
"""Common event processing code for performance scripts.
Copyright (c) 2017-2018, Garth Zeglin. All rights reserved. Licensed under the terms
of the BSD 3-clause license.
Scripted performances are easiest to understand when written as single-threaded
scripts with normal subroutine and branching structure. In this context,
continuing to service I/O while blocking or sleeping can be achieved by running
an I/O event loop while blocking. These utility routines provide facilities for
this.
"""
# Enable basic compatibility features to work with either Python 2 or 3.
from __future__ import print_function, absolute_import, unicode_literals
# Standard library modules.
import time, select, sys
#================================================================
[docs]def sleep(delay=0.0, transports=None):
"""Sleep for a given interval while processing any data received on transports.
:param delay: time in seconds to poll before returning. Default: zero.
:param transports: list of transport objects. Each must implement fileno() and wait_for_data(). Default: None.
"""
# check for the simple case with no transport
if transports is None or len(transports) == 0:
time.sleep(delay)
return
last_time = time.time()
pending = True
while pending or delay > 0.0:
# Check which transports have data available, using select with a short timeout.
ready = select.select(transports,[], [], max(delay, 0.0))
pending = False
# Process messages on each data transport with available data. If any message
# is received, reflag pending to ensure checking again.
for transport in ready[0]:
transport.wait_for_data()
pending = True
# Recalculate the maximum timeout.
if delay > 0.0:
now = time.time()
delay -= (now - last_time) # subtract elapsed time
last_time = now
#================================================================
#================================================================
[docs]def wait_for_condition(predicate, transports=[]):
"""Wait until a callable predicate has returned true while processing any data
received on transports. Note that this will block until data is received,
so this only makes sense if the condition depends on received data.
"""
while predicate() is False:
# Check which transports have data available, using select with no timeout.
ready = select.select(transports,[], [])
# Process messages on each data transport with available data.
for transport in ready[0]:
transport.wait_for_data()
#================================================================
[docs]def wait_for_motion_completion(transports, protocols=None):
"""Wait until all protocols report their motion is complete while
processing any data received on transports. This uses an informal protocol
in which protocols implement send_get_moving_state(callback).
:param transports: list of transports to monitor
:param protocols: optional list of protocols on which to wait. If None, all protocols attached to the transports are queried.
"""
# apply the common case in which all monitored transports are motion devices
if protocols is None:
protocols = [transport.protocol for transport in transports]
# array of flags to indicate the protocols assumed to be moving
motion_in_progress = [True]*len(protocols)
# callback from a device with a Boolean status, True for 'moving'
def status_received(i, flag):
motion_in_progress[i] = flag
if flag: # if still moving, request status again
protocols[i].send_get_moving_state(lambda moving: status_received(i, moving))
# request status from every device, which will kick off the request cycle
for p in range(len(protocols)):
protocols[p].send_get_moving_state(lambda moving, i=p: status_received(i, moving))
# Wait for all motions to cease. Since status requests are no longer issued once a
# given device reports completion, this also waits for all status requests to be answered.
wait_for_condition(lambda: not any(motion_in_progress), transports)
return
#================================================================