Source code for ase.events

"""Common event processing code for performance scripts.

Copyright (c) 2017-2018, Garth Zeglin.  All rights reserved. Licensed under the terms
of the BSD 3-clause license.

Scripted performances are easiest to understand when written as single-threaded
scripts with normal subroutine and branching structure.  In this context,
continuing to service I/O while blocking or sleeping can be achieved by running
an I/O event loop while blocking.  These utility routines provide facilities for
this.
"""
# Enable basic compatibility features to work with either Python 2 or 3.
from __future__ import print_function, absolute_import, unicode_literals

# Standard library modules.
import time, select, sys

#================================================================
[docs]def sleep(delay=0.0, transports=None): """Sleep for a given interval while processing any data received on transports. :param delay: time in seconds to poll before returning. Default: zero. :param transports: list of transport objects. Each must implement fileno() and wait_for_data(). Default: None. """ # check for the simple case with no transport if transports is None or len(transports) == 0: time.sleep(delay) return last_time = time.time() pending = True while pending or delay > 0.0: # Check which transports have data available, using select with a short timeout. ready = select.select(transports,[], [], max(delay, 0.0)) pending = False # Process messages on each data transport with available data. If any message # is received, reflag pending to ensure checking again. for transport in ready[0]: transport.wait_for_data() pending = True # Recalculate the maximum timeout. if delay > 0.0: now = time.time() delay -= (now - last_time) # subtract elapsed time last_time = now
#================================================================
[docs]def wait_for_keyboard_input(transports=[]): """Wait until a line of text has been entered on the keyboard while processing any data received on transports. """ # add the standard input stream to the list of I/O to monitor transports += [sys.stdin] while True: # Check which transports have data available, using select with no timeout. ready = select.select(transports,[], []) # If a line of text was entered, read and return it. if sys.stdin in ready[0]: return sys.stdin.readline() # Process messages on each data transport with available data. for transport in ready[0]: transport.wait_for_data()
#================================================================
[docs]def wait_for_condition(predicate, transports=[]): """Wait until a callable predicate has returned true while processing any data received on transports. Note that this will block until data is received, so this only makes sense if the condition depends on received data. """ while predicate() is False: # Check which transports have data available, using select with no timeout. ready = select.select(transports,[], []) # Process messages on each data transport with available data. for transport in ready[0]: transport.wait_for_data()
#================================================================
[docs]def wait_for_motion_completion(transports, protocols=None): """Wait until all protocols report their motion is complete while processing any data received on transports. This uses an informal protocol in which protocols implement send_get_moving_state(callback). :param transports: list of transports to monitor :param protocols: optional list of protocols on which to wait. If None, all protocols attached to the transports are queried. """ # apply the common case in which all monitored transports are motion devices if protocols is None: protocols = [transport.protocol for transport in transports] # array of flags to indicate the protocols assumed to be moving motion_in_progress = [True]*len(protocols) # callback from a device with a Boolean status, True for 'moving' def status_received(i, flag): motion_in_progress[i] = flag if flag: # if still moving, request status again protocols[i].send_get_moving_state(lambda moving: status_received(i, moving)) # request status from every device, which will kick off the request cycle for p in range(len(protocols)): protocols[p].send_get_moving_state(lambda moving, i=p: status_received(i, moving)) # Wait for all motions to cease. Since status requests are no longer issued once a # given device reports completion, this also waits for all status requests to be answered. wait_for_condition(lambda: not any(motion_in_progress), transports) return
#================================================================